Author: shawkins
Date: 2009-08-08 22:37:26 -0400 (Sat, 08 Aug 2009)
New Revision: 1223
Removed:
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BatchComparator.java
Modified:
trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleSourceID.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/ManagedBatch.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/MemoryState.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleGroupInfo.java
Log:
TEIID-761 fixing inefficient buffer algorithms
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleSourceID.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleSourceID.java 2009-08-09
02:07:34 UTC (rev 1222)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/TupleSourceID.java 2009-08-09
02:37:26 UTC (rev 1223)
@@ -114,7 +114,7 @@
return true;
}
- if(obj == null || ! (obj instanceof TupleSourceID)) {
+ if(! (obj instanceof TupleSourceID)) {
return false;
}
Deleted:
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BatchComparator.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BatchComparator.java 2009-08-09
02:07:34 UTC (rev 1222)
+++
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BatchComparator.java 2009-08-09
02:37:26 UTC (rev 1223)
@@ -1,70 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package com.metamatrix.common.buffer.impl;
-
-import java.util.Comparator;
-
-/**
- * Comparator that compares batches based on their begin row.
- * Just a handy convenience.
- */
-class BatchComparator implements Comparator {
-
- /**
- * Constructor for BatchComparator.
- */
- public BatchComparator() {
- super();
- }
-
- /**
- * Compare two TupleBatch objects and return comparison value
- * based on the begin rows of the batches
- * @see java.util.Comparator#compare(Object, Object)
- * @param o1 First TupleBatch
- * @param o2 Second TupleBatch
- * @return -1, 0, or 1 as o1 compares to o2
- */
- public int compare(Object o1, Object o2) {
- if(o1 == null) {
- return -1;
- } else if(o2 == null) {
- return 1;
- }
-
- ManagedBatch batch1 = (ManagedBatch) o1;
- ManagedBatch batch2 = (ManagedBatch) o2;
-
- long last1 = batch1.getLastAccessed();
- long last2 = batch2.getLastAccessed();
-
- if(last1 < last2) {
- return -1;
- } else if(last1 > last2) {
- return 1;
- } else {
- return 0;
- }
- }
-
-}
Modified:
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java 2009-08-09
02:07:34 UTC (rev 1222)
+++
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java 2009-08-09
02:37:26 UTC (rev 1223)
@@ -74,7 +74,7 @@
// Cache tuple source info in memory
private Map<TupleSourceID, TupleSourceInfo> tupleSourceMap = new
ConcurrentHashMap<TupleSourceID, TupleSourceInfo>();
// groupName (String) -> TupleGroupInfo map
- private Map groupInfos = new HashMap();
+ private Map<String, TupleGroupInfo> groupInfos = new HashMap<String,
TupleGroupInfo>();
// Storage managers
private StorageManager memoryMgr;
@@ -160,14 +160,14 @@
this.memoryState.fillStats(stats);
// Get picture of what's happening
- Set copyKeys = tupleSourceMap.keySet();
- Iterator infoIter = copyKeys.iterator();
+ Set<TupleSourceID> copyKeys = tupleSourceMap.keySet();
+ Iterator<TupleSourceID> infoIter = copyKeys.iterator();
stats.numTupleSources = copyKeys.size();
// Walk through each info and count where all the batches are -
// lock only a single info at a time to minimize locking
while(infoIter.hasNext()) {
- Object key = infoIter.next();
+ TupleSourceID key = infoIter.next();
TupleSourceInfo info = tupleSourceMap.get(key);
if(info == null) {
continue;
@@ -248,7 +248,11 @@
throws MetaMatrixComponentException {
TupleSourceID newID = new
TupleSourceID(String.valueOf(this.currentTuple.getAndIncrement()), this.lookup);
- TupleSourceInfo info = new TupleSourceInfo(newID, schema, types,
getGroupInfo(groupName), tupleSourceType);
+ TupleGroupInfo tupleGroupInfo = getGroupInfo(groupName);
+ TupleSourceInfo info = new TupleSourceInfo(newID, schema, types, tupleGroupInfo,
tupleSourceType);
+ synchronized (tupleGroupInfo) {
+ tupleGroupInfo.getTupleSourceIDs().add(newID);
+ }
tupleSourceMap.put(newID, info);
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR,
MessageLevel.DETAIL)) {
@@ -301,7 +305,10 @@
}
}
}
-
+ TupleGroupInfo tupleGroupInfo = info.getGroupInfo();
+ synchronized (tupleGroupInfo) {
+ tupleGroupInfo.getTupleSourceIDs().remove(tupleSourceID);
+ }
// Remove memory storage
this.memoryMgr.removeBatches(tupleSourceID);
@@ -330,37 +337,22 @@
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, new Object[]{"Removing
TupleSources for group", groupName}); //$NON-NLS-1$
}
- // Get tuple sources to remove
- List removeList = new ArrayList();
- Iterator iter = this.tupleSourceMap.keySet().iterator();
- while(iter.hasNext()) {
- TupleSourceID tsID = (TupleSourceID) iter.next();
- TupleSourceInfo info = this.tupleSourceMap.get(tsID);
- if(info != null && ! info.isRemoved()) {
- // group names can be null - have to do a comparison with nulls here
- String infoGroup = info.getGroupInfo().getGroupName();
- if(infoGroup == null) {
- if(groupName == null) {
- removeList.add(tsID);
- }
- } else if(infoGroup.equals(groupName)) {
- removeList.add(tsID);
- }
- }
- }
-
+ TupleGroupInfo tupleGroupInfo = null;
synchronized(groupInfos) {
- groupInfos.remove(groupName);
+ tupleGroupInfo = groupInfos.remove(groupName);
}
-
+ if (tupleGroupInfo == null) {
+ return;
+ }
+ List<TupleSourceID> tupleSourceIDs = null;
+ synchronized (tupleGroupInfo) {
+ tupleSourceIDs = new
ArrayList<TupleSourceID>(tupleGroupInfo.getTupleSourceIDs());
+ }
// Remove them
- if(removeList.size() > 0) {
+ if(tupleSourceIDs.size() > 0) {
MetaMatrixComponentException ex = null;
- Iterator removeIter = removeList.iterator();
- while(removeIter.hasNext()) {
- TupleSourceID tsID = (TupleSourceID) removeIter.next();
-
+ for (TupleSourceID tsID : tupleSourceIDs) {
try {
this.removeTupleSource(tsID);
} catch(TupleSourceNotFoundException e) {
@@ -883,7 +875,7 @@
private TupleGroupInfo getGroupInfo(String groupName) {
TupleGroupInfo info = null;
synchronized(groupInfos) {
- info = (TupleGroupInfo)groupInfos.get(groupName);
+ info = groupInfos.get(groupName);
if (info == null) {
// If it doesn't exist, create a new one
info = new TupleGroupInfo(groupName);
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/ManagedBatch.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/ManagedBatch.java 2009-08-09
02:07:34 UTC (rev 1222)
+++
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/ManagedBatch.java 2009-08-09
02:37:26 UTC (rev 1223)
@@ -53,9 +53,9 @@
private int beginRow;
private int endRow;
private long size;
- private long lastAccessed;
private int location;
private int pinnedCount;
+ private int hashCode;
// logging
private static int STACK_LEVELS_TO_OMIT = 2;
@@ -72,7 +72,7 @@
this.beginRow = begin;
this.endRow = end;
this.size = size;
- updateLastAccessed();
+ this.hashCode = HashCodeUtil.hashCode(tupleSourceID.hashCode(), beginRow);
}
/**
@@ -108,21 +108,6 @@
}
/**
- * Get the last accessed timestamp.
- * @return Last accessed timestamp
- */
- public long getLastAccessed() {
- return this.lastAccessed;
- }
-
- /**
- * Update the last accessed timestamp from system clock
- */
- public void updateLastAccessed() {
- lastAccessed = System.currentTimeMillis();
- }
-
- /**
* Get the location of the batch, as defined in constants
* @return Location
*/
@@ -212,10 +197,11 @@
* @return Hash code
*/
public int hashCode() {
- return HashCodeUtil.hashCode(beginRow, tupleSourceID);
+ return hashCode;
}
public String toString() {
return "ManagedBatch[" + tupleSourceID + ", " + beginRow +
", " + endRow + "]"; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
//$NON-NLS-4$
}
+
}
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/MemoryState.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/MemoryState.java 2009-08-09
02:07:34 UTC (rev 1222)
+++
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/MemoryState.java 2009-08-09
02:37:26 UTC (rev 1223)
@@ -23,11 +23,13 @@
package com.metamatrix.common.buffer.impl;
import java.util.ArrayList;
-import java.util.Comparator;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import com.metamatrix.common.buffer.TupleSourceID;
import com.metamatrix.common.log.LogManager;
@@ -54,7 +56,7 @@
*/
class MemoryState {
- private static ThreadLocal PINNED_BY_THREAD = new ThreadLocal();
+ private static ThreadLocal<Map<TupleSourceID, Map<Integer,
ManagedBatch>>> PINNED_BY_THREAD = new ThreadLocal<Map<TupleSourceID,
Map<Integer, ManagedBatch>>>();
//memory availability when reserveMemory() is called
static final int MEMORY_AVAILABLE = 1;
@@ -68,11 +70,10 @@
private volatile long memoryUsed = 0;
// Track the currently pinned stuff by TupleSourceID for easy lookup
- private Map pinned = new HashMap();
+ private Map<TupleSourceID, Map<Integer, ManagedBatch>> pinned = new
HashMap<TupleSourceID, Map<Integer, ManagedBatch>>();
// Track the currently unpinned stuff in a sorted list, sorted by access time
- private static final Comparator BATCH_COMPARATOR = new BatchComparator();
- private List unpinned = new ArrayList();
+ private Set<ManagedBatch> unpinned = Collections.synchronizedSet(new
LinkedHashSet<ManagedBatch>());
/**
* Constructor for MemoryState, based on config.
@@ -158,9 +159,9 @@
synchronized (this) {
addPinnedInternal(pinned, batch);
}
- Map theadPinned = (Map)PINNED_BY_THREAD.get();
+ Map<TupleSourceID, Map<Integer, ManagedBatch>> theadPinned =
PINNED_BY_THREAD.get();
if (theadPinned == null) {
- theadPinned = new HashMap();
+ theadPinned = new HashMap<TupleSourceID, Map<Integer,
ManagedBatch>>();
PINNED_BY_THREAD.set(theadPinned);
}
addPinnedInternal(theadPinned, batch);
@@ -169,11 +170,11 @@
}
}
- private void addPinnedInternal(Map pinnedMap, ManagedBatch batch) {
+ private void addPinnedInternal(Map<TupleSourceID, Map<Integer,
ManagedBatch>> pinnedMap, ManagedBatch batch) {
TupleSourceID tsID = batch.getTupleSourceID();
- Map tsPinned = (Map) pinnedMap.get(tsID);
+ Map<Integer, ManagedBatch> tsPinned = pinnedMap.get(tsID);
if(tsPinned == null) {
- tsPinned = new HashMap();
+ tsPinned = new HashMap<Integer, ManagedBatch>();
pinnedMap.put(tsID, tsPinned);
}
@@ -193,7 +194,7 @@
result = removePinnedInternal(pinned, tsID, beginRow);
}
if (result != null) {
- Map theadPinned = (Map)PINNED_BY_THREAD.get();
+ Map<TupleSourceID, Map<Integer, ManagedBatch>> theadPinned =
PINNED_BY_THREAD.get();
if (theadPinned != null) {
removePinnedInternal(theadPinned, tsID, beginRow);
}
@@ -201,11 +202,11 @@
return result;
}
- private ManagedBatch removePinnedInternal(Map pinnedMap, TupleSourceID tsID,
+ private ManagedBatch removePinnedInternal(Map<TupleSourceID, Map<Integer,
ManagedBatch>> pinnedMap, TupleSourceID tsID,
int beginRow) {
- Map tsPinned = (Map) pinnedMap.get(tsID);
+ Map<Integer, ManagedBatch> tsPinned = pinnedMap.get(tsID);
if(tsPinned != null) {
- ManagedBatch mbatch = (ManagedBatch) tsPinned.remove(new Integer(beginRow));
+ ManagedBatch mbatch = tsPinned.remove(new Integer(beginRow));
if(tsPinned.size() == 0) {
pinnedMap.remove(tsID);
@@ -220,28 +221,15 @@
* Add an unpinned batch
* @param batch Unpinned batch to add
*/
- public synchronized void addUnpinned(ManagedBatch batch) {
- batch.updateLastAccessed();
-
- if(unpinned.isEmpty()) {
- unpinned.add(batch);
- return;
- }
- int size = unpinned.size() - 1;
- for(int i=size; i>=0; i--) {
- ManagedBatch listBatch = (ManagedBatch)unpinned.get(i);
- if(BATCH_COMPARATOR.compare(batch, listBatch) >= 0) {
- unpinned.add(i + 1, batch);
- break;
- }
- }
+ public void addUnpinned(ManagedBatch batch) {
+ unpinned.add(batch);
}
/**
* Remove an unpinned batch
* @param batch Batch to remove
*/
- public synchronized void removeUnpinned(ManagedBatch batch) {
+ public void removeUnpinned(ManagedBatch batch) {
unpinned.remove(batch);
}
@@ -254,17 +242,19 @@
* should check that each batch is still unpinned.
* @return Safe (but possibly out of date) iterator on unpinned batches
*/
- public synchronized Iterator getAllUnpinned() {
- List copy = new ArrayList(unpinned);
- return copy.iterator();
+ public Iterator<ManagedBatch> getAllUnpinned() {
+ synchronized (unpinned) {
+ List<ManagedBatch> copy = new ArrayList<ManagedBatch>(unpinned);
+ return copy.iterator();
+ }
}
- public synchronized Map getAllPinned() {
- return new HashMap(pinned);
+ public synchronized Map<TupleSourceID, Map<Integer, ManagedBatch>>
getAllPinned() {
+ return new HashMap<TupleSourceID, Map<Integer,
ManagedBatch>>(pinned);
}
- public Map getPinnedByCurrentThread() {
- return (Map)PINNED_BY_THREAD.get();
+ public Map<TupleSourceID, Map<Integer, ManagedBatch>>
getPinnedByCurrentThread() {
+ return PINNED_BY_THREAD.get();
}
}
Modified:
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleGroupInfo.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleGroupInfo.java 2009-08-09
02:07:34 UTC (rev 1222)
+++
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleGroupInfo.java 2009-08-09
02:37:26 UTC (rev 1223)
@@ -22,8 +22,13 @@
package com.metamatrix.common.buffer.impl;
+import java.util.HashSet;
+import java.util.Set;
+import com.metamatrix.common.buffer.TupleSourceID;
+
+
/**
* Represents a logical grouping of tuple sources managed by the buffer manager.
* Tuple sources are typically grouped by session, and the groupName is typically a
sessionID/connectionID.
@@ -34,11 +39,16 @@
private String groupName;
/** The bytes of memory used by this tuple group*/
private long memoryUsed;
+ private Set<TupleSourceID> tupleSourceIDs = new
HashSet<TupleSourceID>();
TupleGroupInfo(String groupName) {
this.groupName = groupName;
}
+ public Set<TupleSourceID> getTupleSourceIDs() {
+ return tupleSourceIDs;
+ }
+
String getGroupName() {
return groupName;
}