[teiid-commits] teiid SVN: r1223 - in trunk/engine/src/main/java/com/metamatrix/common/buffer: impl and 1 other directory.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Sat Aug 8 22:37:27 EDT 2009


Author: shawkins
Date: 2009-08-08 22:37:26 -0400 (Sat, 08 Aug 2009)
New Revision: 1223

Removed:
   trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BatchComparator.java
Modified:
   trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleSourceID.java
   trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
   trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/ManagedBatch.java
   trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/MemoryState.java
   trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleGroupInfo.java
Log:
TEIID-761 fixing inefficient buffer algorithms

Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleSourceID.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleSourceID.java	2009-08-09 02:07:34 UTC (rev 1222)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleSourceID.java	2009-08-09 02:37:26 UTC (rev 1223)
@@ -114,7 +114,7 @@
             return true;
         }
 
-        if(obj == null || ! (obj instanceof TupleSourceID)) {
+        if(! (obj instanceof TupleSourceID)) {
             return false;
         }
 

Deleted: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BatchComparator.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BatchComparator.java	2009-08-09 02:07:34 UTC (rev 1222)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BatchComparator.java	2009-08-09 02:37:26 UTC (rev 1223)
@@ -1,70 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership.  Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- * 
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- * 
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package com.metamatrix.common.buffer.impl;
-
-import java.util.Comparator;
-
-/**
- * Comparator that compares batches based on their begin row.
- * Just a handy convenience.
- */
-class BatchComparator implements Comparator {
-
-    /**
-     * Constructor for BatchComparator.
-     */
-    public BatchComparator() {
-        super();
-    }
-
-    /**
-     * Compare two TupleBatch objects and return comparison value
-     * based on the begin rows of the batches
-     * @see java.util.Comparator#compare(Object, Object)
-     * @param o1 First TupleBatch
-     * @param o2 Second TupleBatch
-     * @return -1, 0, or 1 as o1 compares to o2
-     */
-    public int compare(Object o1, Object o2) {
-        if(o1 == null) { 
-            return -1;
-        } else if(o2 == null) { 
-            return 1;
-        }
-        
-        ManagedBatch batch1 = (ManagedBatch) o1;
-        ManagedBatch batch2 = (ManagedBatch) o2;
-        
-        long last1 = batch1.getLastAccessed();
-        long last2 = batch2.getLastAccessed();
-        
-        if(last1 < last2) {
-            return -1;
-        } else if(last1 > last2) {
-            return 1;
-        } else {
-            return 0;
-        }
-    }
-
-}

Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java	2009-08-09 02:07:34 UTC (rev 1222)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java	2009-08-09 02:37:26 UTC (rev 1223)
@@ -74,7 +74,7 @@
     // Cache tuple source info in memory 
     private Map<TupleSourceID, TupleSourceInfo> tupleSourceMap = new ConcurrentHashMap<TupleSourceID, TupleSourceInfo>();
     // groupName (String) -> TupleGroupInfo map
-    private Map groupInfos = new HashMap();
+    private Map<String, TupleGroupInfo> groupInfos = new HashMap<String, TupleGroupInfo>();
 
     // Storage managers
     private StorageManager memoryMgr;
@@ -160,14 +160,14 @@
         this.memoryState.fillStats(stats);
         
         // Get picture of what's happening
-        Set copyKeys = tupleSourceMap.keySet();
-        Iterator infoIter = copyKeys.iterator();
+        Set<TupleSourceID> copyKeys = tupleSourceMap.keySet();
+        Iterator<TupleSourceID> infoIter = copyKeys.iterator();
         stats.numTupleSources = copyKeys.size();
 
         // Walk through each info and count where all the batches are -
         // lock only a single info at a time to minimize locking
         while(infoIter.hasNext()) {
-            Object key = infoIter.next();
+            TupleSourceID key = infoIter.next();
             TupleSourceInfo info = tupleSourceMap.get(key);
             if(info == null) {
                 continue;
@@ -248,7 +248,11 @@
     throws MetaMatrixComponentException {
 
         TupleSourceID newID = new TupleSourceID(String.valueOf(this.currentTuple.getAndIncrement()), this.lookup);
-		TupleSourceInfo info = new TupleSourceInfo(newID, schema, types, getGroupInfo(groupName), tupleSourceType);
+        TupleGroupInfo tupleGroupInfo = getGroupInfo(groupName);
+		TupleSourceInfo info = new TupleSourceInfo(newID, schema, types, tupleGroupInfo, tupleSourceType);
+		synchronized (tupleGroupInfo) {
+            tupleGroupInfo.getTupleSourceIDs().add(newID);
+		}
 		tupleSourceMap.put(newID, info);
 
         if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
@@ -301,7 +305,10 @@
                 }
             }
         }
-
+        TupleGroupInfo tupleGroupInfo = info.getGroupInfo();
+        synchronized (tupleGroupInfo) {
+			tupleGroupInfo.getTupleSourceIDs().remove(tupleSourceID);
+		}
         // Remove memory storage
         this.memoryMgr.removeBatches(tupleSourceID);
 
@@ -330,37 +337,22 @@
             LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, new Object[]{"Removing TupleSources for group", groupName}); //$NON-NLS-1$
         }
 
-        // Get tuple sources to remove
-        List removeList = new ArrayList();
-    	Iterator iter = this.tupleSourceMap.keySet().iterator();
-    	while(iter.hasNext()) {
-    		TupleSourceID tsID = (TupleSourceID) iter.next();
-    		TupleSourceInfo info = this.tupleSourceMap.get(tsID);
-            if(info != null && ! info.isRemoved()) {
-                // group names can be null - have to do a comparison with nulls here
-                String infoGroup = info.getGroupInfo().getGroupName();
-                if(infoGroup == null) {
-                    if(groupName == null) {
-                        removeList.add(tsID);
-                    }
-                } else if(infoGroup.equals(groupName)) {
-                    removeList.add(tsID);
-                }
-    		}
-    	}
-        
+        TupleGroupInfo tupleGroupInfo = null;
         synchronized(groupInfos) {
-            groupInfos.remove(groupName);
+        	tupleGroupInfo = groupInfos.remove(groupName);
         }
-
+        if (tupleGroupInfo == null) {
+        	return;
+        }
+        List<TupleSourceID> tupleSourceIDs = null;
+        synchronized (tupleGroupInfo) {
+			tupleSourceIDs = new ArrayList<TupleSourceID>(tupleGroupInfo.getTupleSourceIDs());
+		}
         // Remove them
-        if(removeList.size() > 0) {
+        if(tupleSourceIDs.size() > 0) {
 	        MetaMatrixComponentException ex = null;
 
-	        Iterator removeIter = removeList.iterator();
-	        while(removeIter.hasNext()) {
-	     		TupleSourceID tsID = (TupleSourceID) removeIter.next();
-
+	        for (TupleSourceID tsID : tupleSourceIDs) {
 	     		try {
 	     			this.removeTupleSource(tsID);
 	     		} catch(TupleSourceNotFoundException e) {
@@ -883,7 +875,7 @@
     private TupleGroupInfo getGroupInfo(String groupName) {
         TupleGroupInfo info = null;
         synchronized(groupInfos) {
-            info = (TupleGroupInfo)groupInfos.get(groupName);
+            info = groupInfos.get(groupName);
             if (info == null) {
                 // If it doesn't exist, create a new one
                 info = new TupleGroupInfo(groupName);

Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/ManagedBatch.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/ManagedBatch.java	2009-08-09 02:07:34 UTC (rev 1222)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/ManagedBatch.java	2009-08-09 02:37:26 UTC (rev 1223)
@@ -53,9 +53,9 @@
     private int beginRow;
     private int endRow;
     private long size;    
-    private long lastAccessed; 
     private int location;  
     private int pinnedCount;
+    private int hashCode;
     
     // logging
     private static int STACK_LEVELS_TO_OMIT = 2;
@@ -72,7 +72,7 @@
         this.beginRow = begin;
         this.endRow = end;
         this.size = size;
-        updateLastAccessed();       
+        this.hashCode = HashCodeUtil.hashCode(tupleSourceID.hashCode(), beginRow);
     }
     
     /**
@@ -108,21 +108,6 @@
     }
     
     /**
-     * Get the last accessed timestamp.
-     * @return Last accessed timestamp
-     */    
-    public long getLastAccessed() {
-        return this.lastAccessed;
-    }
-    
-    /**
-     * Update the last accessed timestamp from system clock
-     */
-    public void updateLastAccessed() {
-        lastAccessed = System.currentTimeMillis();    
-    }    
-    
-    /**
      * Get the location of the batch, as defined in constants
      * @return Location
      */
@@ -212,10 +197,11 @@
      * @return Hash code
      */
     public int hashCode() {
-        return HashCodeUtil.hashCode(beginRow, tupleSourceID);
+        return hashCode;
     }
     
     public String toString() {
         return "ManagedBatch[" + tupleSourceID + ", " + beginRow + ", " + endRow + "]"; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$ //$NON-NLS-4$
     }
+
 }

Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/MemoryState.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/MemoryState.java	2009-08-09 02:07:34 UTC (rev 1222)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/MemoryState.java	2009-08-09 02:37:26 UTC (rev 1223)
@@ -23,11 +23,13 @@
 package com.metamatrix.common.buffer.impl;
 
 import java.util.ArrayList;
-import java.util.Comparator;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import com.metamatrix.common.buffer.TupleSourceID;
 import com.metamatrix.common.log.LogManager;
@@ -54,7 +56,7 @@
  */
 class MemoryState {
     
-    private static ThreadLocal PINNED_BY_THREAD = new ThreadLocal();
+    private static ThreadLocal<Map<TupleSourceID, Map<Integer, ManagedBatch>>> PINNED_BY_THREAD = new ThreadLocal<Map<TupleSourceID, Map<Integer, ManagedBatch>>>();
     
     //memory availability when reserveMemory() is called
     static final int MEMORY_AVAILABLE = 1;
@@ -68,11 +70,10 @@
     private volatile long memoryUsed = 0;
 
     // Track the currently pinned stuff by TupleSourceID for easy lookup
-    private Map pinned = new HashMap();     
+    private Map<TupleSourceID, Map<Integer, ManagedBatch>> pinned = new HashMap<TupleSourceID, Map<Integer, ManagedBatch>>();     
     
     // Track the currently unpinned stuff in a sorted list, sorted by access time
-    private static final Comparator BATCH_COMPARATOR = new BatchComparator();
-    private List unpinned = new ArrayList();
+    private Set<ManagedBatch> unpinned = Collections.synchronizedSet(new LinkedHashSet<ManagedBatch>());
     
     /**
      * Constructor for MemoryState, based on config.
@@ -158,9 +159,9 @@
         synchronized (this) {
             addPinnedInternal(pinned, batch);
         }
-        Map theadPinned = (Map)PINNED_BY_THREAD.get();
+        Map<TupleSourceID, Map<Integer, ManagedBatch>> theadPinned = PINNED_BY_THREAD.get();
         if (theadPinned == null) {
-            theadPinned = new HashMap();
+            theadPinned = new HashMap<TupleSourceID, Map<Integer, ManagedBatch>>();
             PINNED_BY_THREAD.set(theadPinned);
         }
         addPinnedInternal(theadPinned, batch);
@@ -169,11 +170,11 @@
         }
     }
 
-    private void addPinnedInternal(Map pinnedMap, ManagedBatch batch) {
+    private void addPinnedInternal(Map<TupleSourceID, Map<Integer, ManagedBatch>> pinnedMap, ManagedBatch batch) {
         TupleSourceID tsID = batch.getTupleSourceID();
-        Map tsPinned = (Map) pinnedMap.get(tsID);
+        Map<Integer, ManagedBatch> tsPinned = pinnedMap.get(tsID);
         if(tsPinned == null) {
-            tsPinned = new HashMap();
+            tsPinned = new HashMap<Integer, ManagedBatch>();
             pinnedMap.put(tsID, tsPinned);
         } 
         
@@ -193,7 +194,7 @@
             result = removePinnedInternal(pinned, tsID, beginRow);
         }
         if (result != null) {
-            Map theadPinned = (Map)PINNED_BY_THREAD.get();
+        	Map<TupleSourceID, Map<Integer, ManagedBatch>> theadPinned = PINNED_BY_THREAD.get();
             if (theadPinned != null) {
                 removePinnedInternal(theadPinned, tsID, beginRow);
             }
@@ -201,11 +202,11 @@
         return result;
     }
 
-    private ManagedBatch removePinnedInternal(Map pinnedMap, TupleSourceID tsID,
+    private ManagedBatch removePinnedInternal(Map<TupleSourceID, Map<Integer, ManagedBatch>> pinnedMap, TupleSourceID tsID,
                                               int beginRow) {
-        Map tsPinned = (Map) pinnedMap.get(tsID);
+        Map<Integer, ManagedBatch> tsPinned = pinnedMap.get(tsID);
         if(tsPinned != null) { 
-            ManagedBatch mbatch = (ManagedBatch) tsPinned.remove(new Integer(beginRow));
+            ManagedBatch mbatch = tsPinned.remove(new Integer(beginRow));
             
             if(tsPinned.size() == 0) { 
                 pinnedMap.remove(tsID);
@@ -220,28 +221,15 @@
      * Add an unpinned batch
      * @param batch Unpinned batch to add
      */        
-    public synchronized void addUnpinned(ManagedBatch batch) {
-        batch.updateLastAccessed();
-  
-        if(unpinned.isEmpty()) {
-            unpinned.add(batch);
-            return;
-        }
-        int size = unpinned.size() - 1;
-        for(int i=size; i>=0; i--) {
-            ManagedBatch listBatch = (ManagedBatch)unpinned.get(i); 
-            if(BATCH_COMPARATOR.compare(batch, listBatch) >= 0) {
-                unpinned.add(i + 1, batch);
-                break;
-            } 
-        }
+    public void addUnpinned(ManagedBatch batch) {
+        unpinned.add(batch);
     }
     
     /**
      * Remove an unpinned batch
      * @param batch Batch to remove
      */
-    public synchronized void removeUnpinned(ManagedBatch batch) {
+    public void removeUnpinned(ManagedBatch batch) {
         unpinned.remove(batch);
     }
 
@@ -254,17 +242,19 @@
      * should check that each batch is still unpinned.
      * @return Safe (but possibly out of date) iterator on unpinned batches
      */
-    public synchronized Iterator getAllUnpinned() {
-        List copy = new ArrayList(unpinned);
-        return copy.iterator();    
+    public Iterator<ManagedBatch> getAllUnpinned() {
+    	synchronized (unpinned) {
+            List<ManagedBatch> copy = new ArrayList<ManagedBatch>(unpinned);
+            return copy.iterator();    
+		}
     }
     
-    public synchronized Map getAllPinned() {
-        return new HashMap(pinned);    
+    public synchronized Map<TupleSourceID, Map<Integer, ManagedBatch>> getAllPinned() {
+        return new HashMap<TupleSourceID, Map<Integer, ManagedBatch>>(pinned);    
     }
         
-    public Map getPinnedByCurrentThread() {
-        return (Map)PINNED_BY_THREAD.get();
+    public Map<TupleSourceID, Map<Integer, ManagedBatch>> getPinnedByCurrentThread() {
+        return PINNED_BY_THREAD.get();
     }
 
 }

Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleGroupInfo.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleGroupInfo.java	2009-08-09 02:07:34 UTC (rev 1222)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleGroupInfo.java	2009-08-09 02:37:26 UTC (rev 1223)
@@ -22,8 +22,13 @@
 
 package com.metamatrix.common.buffer.impl;
 
+import java.util.HashSet;
+import java.util.Set;
 
+import com.metamatrix.common.buffer.TupleSourceID;
 
+
+
 /** 
  * Represents a logical grouping of tuple sources managed by the buffer manager.
  * Tuple sources are typically grouped by session, and the groupName is typically a sessionID/connectionID.
@@ -34,11 +39,16 @@
     private String groupName;
     /** The bytes of memory used by this tuple group*/
     private long memoryUsed;
+    private Set<TupleSourceID> tupleSourceIDs = new HashSet<TupleSourceID>();
     
     TupleGroupInfo(String groupName) {
         this.groupName = groupName;
     }
     
+    public Set<TupleSourceID> getTupleSourceIDs() {
+		return tupleSourceIDs;
+	}
+    
     String getGroupName() {
         return groupName;
     }



More information about the teiid-commits mailing list