Author: shawkins
Date: 2009-08-10 23:25:47 -0400 (Mon, 10 Aug 2009)
New Revision: 1233
Added:
trunk/engine/src/test/java/com/metamatrix/common/buffer/BufferManagerFactory.java
Removed:
trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManagerFactory.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BatchMap.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BatchMapValueTranslator.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/storage/memory/
trunk/engine/src/test/java/com/metamatrix/common/buffer/storage/memory/
Modified:
trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManagerPropertyNames.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/StorageManager.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/ManagedBatch.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleSourceInfo.java
trunk/engine/src/main/java/com/metamatrix/common/buffer/storage/file/FileStorageManager.java
trunk/engine/src/test/java/com/metamatrix/common/buffer/impl/TestBatchMap.java
trunk/engine/src/test/java/com/metamatrix/common/buffer/impl/TestBufferManagerImpl.java
trunk/engine/src/test/java/com/metamatrix/common/buffer/storage/file/TestFileStorageManager.java
trunk/engine/src/test/java/com/metamatrix/query/processor/TestVirtualDepJoin.java
trunk/engine/src/test/java/com/metamatrix/query/processor/relational/NodeTestUtil.java
trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedBufferService.java
trunk/server/src/main/java/com/metamatrix/server/dqp/service/PlatformBufferService.java
Log:
TEIID-767 merging memorystoremanager directly into the buffermanager.
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java 2009-08-11
01:26:05 UTC (rev 1232)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManager.java 2009-08-11
03:25:47 UTC (rev 1233)
@@ -90,12 +90,10 @@
int getConnectorBatchSize();
/**
- * Adds a {@link StorageManager} to this BufferManager instance. This
- * method may be called multiple times; it will be first called after the
- * call to {@link #initialize}.
+ * Adds a {@link StorageManager} to this BufferManager instance.
* @param storageManager Storage manager to add
*/
- void addStorageManager(StorageManager storageManager);
+ void setStorageManager(StorageManager storageManager);
/**
* Creates a tuple source based on a schema and properties describing
Deleted:
trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManagerFactory.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManagerFactory.java 2009-08-11
01:26:05 UTC (rev 1232)
+++
trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManagerFactory.java 2009-08-11
03:25:47 UTC (rev 1233)
@@ -1,103 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package com.metamatrix.common.buffer;
-
-import java.util.Properties;
-
-import com.metamatrix.api.exception.MetaMatrixComponentException;
-import com.metamatrix.common.buffer.impl.BufferManagerImpl;
-import com.metamatrix.common.buffer.storage.file.FileStorageManager;
-import com.metamatrix.common.buffer.storage.memory.MemoryStorageManager;
-import com.metamatrix.common.util.PropertiesUtils;
-
-/**
- * <p>Factory for BufferManager instances. One method will get
- * a server buffer manager, as it should be instantiated in a running
- * MetaMatrix server. That BufferManager is configured mostly by the
- * passed in properties.</p>
- *
- * <p>The other method returns a stand-alone, in-memory buffer manager. This
- * is typically used for either in-memory testing or any time the
- * query processor component is not expected to run out of memory, such as
- * within the modeler.</p>
- */
-public class BufferManagerFactory {
-
- private static BufferManager INSTANCE;
-
- /**
- * Helper to get a buffer manager all set up for unmanaged standalone use. This is
- * typically used for testing or when memory is not an issue.
- * @return BufferManager ready for use
- */
- public static BufferManager getStandaloneBufferManager() throws
MetaMatrixComponentException {
- if (INSTANCE == null) {
- BufferManager bufferMgr = new BufferManagerImpl();
- Properties props = new Properties();
- props.setProperty(BufferManagerPropertyNames.MEMORY_AVAILABLE,
String.valueOf(Long.MAX_VALUE));
- props.setProperty(BufferManagerPropertyNames.SESSION_USE_PERCENTAGE,
"100"); //$NON-NLS-1$
- props.setProperty(BufferManagerPropertyNames.LOG_STATS_INTERVAL, "0");
//$NON-NLS-1$
- props.setProperty(BufferManagerPropertyNames.MANAGEMENT_INTERVAL,
"0"); //$NON-NLS-1$
- bufferMgr.initialize("local", props); //$NON-NLS-1$
-
- // Add unmanaged memory storage manager
- bufferMgr.addStorageManager(new MemoryStorageManager());
- bufferMgr.addStorageManager(new MemoryStorageManager() {
- @Override
- public int getStorageType() {
- return StorageManager.TYPE_FILE;
- }
- });
- INSTANCE = bufferMgr;
- }
-
- return INSTANCE;
- }
-
- /**
- * Helper to get a buffer manager all set up for unmanaged standalone use. This is
- * typically used for testing or when memory is not an issue.
- * @param lookup Lookup implementation to use
- * @param props Configuration properties
- * @return BufferManager ready for use
- */
- public static BufferManager getServerBufferManager(String lookup, Properties props)
throws MetaMatrixComponentException {
- Properties bmProps = PropertiesUtils.clone(props, false);
- // Construct buffer manager
- BufferManager bufferManager = new BufferManagerImpl();
- bufferManager.initialize(lookup, bmProps);
-
- // Get the properties for FileStorageManager and create.
- StorageManager fsm = new FileStorageManager();
- fsm.initialize(bmProps);
- bufferManager.addStorageManager(fsm);
-
- // Create MemoryStorageManager
- StorageManager msm = new MemoryStorageManager();
- msm.initialize(bmProps);
- bufferManager.addStorageManager(msm);
-
- return bufferManager;
- }
-
-}
Modified:
trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManagerPropertyNames.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManagerPropertyNames.java 2009-08-11
01:26:05 UTC (rev 1232)
+++
trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManagerPropertyNames.java 2009-08-11
03:25:47 UTC (rev 1233)
@@ -28,12 +28,6 @@
public final class BufferManagerPropertyNames {
/**
- * Optional property - the class name of the memory manager to use (must be an
implementation
- * of {@link com.metamatrix.buffer.impl.BufferIDCreator}.
- */
- public static final String ID_CREATOR = "metamatrix.buffer.idCreator";
//$NON-NLS-1$
-
- /**
* Optional property - the amount of memory (in megabytes) that buffer management
should use.
* This value should be a positive integer (less than the the max heap size) and
defaults to
* 128.
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/StorageManager.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/StorageManager.java 2009-08-11
01:26:05 UTC (rev 1232)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/StorageManager.java 2009-08-11
03:25:47 UTC (rev 1233)
@@ -39,11 +39,6 @@
public interface StorageManager {
/**
- * Constant for a StorageManager for in-memory storage
- */
- public static final int TYPE_MEMORY = 0;
-
- /**
* Constant for a StorageManager for database storage
*/
public static final int TYPE_DATABASE = 1;
@@ -68,16 +63,6 @@
throws MetaMatrixComponentException;
/**
- * Get the type of storage as defined by constants.
- * @return Storage type
- * @see StorageManager#TYPE_MEMORY
- * @see StorageManager#TYPE_DATABASE
- * @see StorageManager#TYPE_FILE
- * @see StorageManager#TYPE_REMOTE
- */
- int getStorageType();
-
- /**
* Add a batch to the storage manager.
* @param types a hint to the StorageManager about the types of data in the batch
* @throws MetaMatrixComponentException indicating a non-business-related
@@ -99,17 +84,6 @@
throws TupleSourceNotFoundException, MetaMatrixComponentException;
/**
- * Remove a batch from this storage as specified. If the tuple source
- * is unknown or the batch is unknown, a TupleSourceNotFoundException is
- * thrown.
- * @throws TupleSourceNotFoundException indicating the sourceID is unknown
- * @throws MetaMatrixComponentException indicating a non-business-related
- * exception (such as a communication exception)
- */
- void removeBatch(TupleSourceID sourceID, int beginRow)
- throws TupleSourceNotFoundException, MetaMatrixComponentException;
-
- /**
* Remove all batches for the specified tuple source. If no batches exist,
* no exception is thrown.
* @throws MetaMatrixComponentException indicating a non-business-related
Deleted: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BatchMap.java
===================================================================
--- trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BatchMap.java 2009-08-11
01:26:05 UTC (rev 1232)
+++ trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BatchMap.java 2009-08-11
03:25:47 UTC (rev 1233)
@@ -1,107 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package com.metamatrix.common.buffer.impl;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.TreeMap;
-
-
-/**
- * This class is used to save batches ordered by the first row.
- * @since 4.3
- */
-public class BatchMap {
- private TreeMap batches;
- private BatchMapValueTranslator translator;
-
- public BatchMap(BatchMapValueTranslator translator) {
- batches = new TreeMap();
- this.translator = translator;
- }
-
- public void addBatch(int beginRow, Object batch) {
- batches.put(new Integer(beginRow), batch);
- }
-
- public Object getBatch(int beginRow) {
-
- //check special cases first
- int batchSize = batches.size();
- if(batchSize == 0) {
- return null;
- }
- Object batch;
- if(batchSize == 1) {
- batch = batches.values().iterator().next();
- if(beginRow >= translator.getBeginRow(batch) && beginRow <=
translator.getEndRow(batch)) {
- return batch;
- }
- return null;
- }
-
- //Try to search by beginRow. This is fast.
- if((batch = batches.get(new Integer(beginRow))) != null) {
- return batch;
- }
-
- List batchList = new ArrayList(batches.values());
- return doBinarySearchForBatch(batchList, beginRow);
- }
-
- public void removeBatch(int beginRow) {
- if(batches.isEmpty()) {
- return;
- }
- batches.remove(new Integer(beginRow));
- }
-
- public Iterator getBatchIterator() {
- return batches.values().iterator();
- }
-
- public boolean isEmpty() {
- return batches.isEmpty();
- }
-
- private Object doBinarySearchForBatch(List batches, int beginRow) {
- int batchSize = batches.size();
- int beginIndex = 0;
- int midIndex;
- int endIndex = batchSize;
- Object batch;
- while(beginIndex < endIndex) {
- midIndex = (beginIndex + endIndex)/2;
- batch = batches.get(midIndex);
- if(beginRow < translator.getBeginRow(batch)){
- endIndex = midIndex;
- }else if(beginRow > translator.getEndRow(batch)) {
- beginIndex = midIndex + 1;
- }else {
- return batch;
- }
- }
- return null;
- }
-}
Deleted:
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BatchMapValueTranslator.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BatchMapValueTranslator.java 2009-08-11
01:26:05 UTC (rev 1232)
+++
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BatchMapValueTranslator.java 2009-08-11
03:25:47 UTC (rev 1233)
@@ -1,45 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package com.metamatrix.common.buffer.impl;
-
-
-/**
- * @since 4.3
- */
-public interface BatchMapValueTranslator {
- /**
- * Return the beginning row number of the given batch.
- * @param batchMapValue
- * @return
- * @since 4.3
- */
- int getBeginRow(Object batchMapValue);
-
- /**
- * Return the ending row number of the given batch.
- * @param batchMapValue
- * @return
- * @since 4.3
- */
- int getEndRow(Object batchMapValue);
-}
Modified:
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java 2009-08-11
01:26:05 UTC (rev 1232)
+++
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/BufferManagerImpl.java 2009-08-11
03:25:47 UTC (rev 1233)
@@ -77,7 +77,6 @@
private Map<String, TupleGroupInfo> groupInfos = new HashMap<String,
TupleGroupInfo>();
// Storage managers
- private StorageManager memoryMgr;
private StorageManager diskMgr;
// ID creator
@@ -174,9 +173,9 @@
}
synchronized(info) {
- Iterator batchIter = info.getBatchIterator();
+ Iterator<ManagedBatch> batchIter = info.getBatchIterator();
while(batchIter.hasNext()) {
- ManagedBatch batch = (ManagedBatch) batchIter.next();
+ ManagedBatch batch = batchIter.next();
switch(batch.getLocation()) {
case ManagedBatch.PERSISTENT:
stats.numPersistentBatches++;
@@ -226,14 +225,10 @@
* Add a storage manager to this buffer manager, order is unimportant
* @param storageManager Storage manager to add
*/
- public void addStorageManager(StorageManager storageManager) {
+ public void setStorageManager(StorageManager storageManager) {
Assertion.isNotNull(storageManager);
- if(storageManager.getStorageType() == StorageManager.TYPE_MEMORY) {
- this.memoryMgr = storageManager;
- } else {
- Assertion.isNull(diskMgr);
- this.diskMgr = storageManager;
- }
+ Assertion.isNull(diskMgr);
+ this.diskMgr = storageManager;
}
/**
@@ -289,9 +284,9 @@
if(! info.isRemoved()) {
info.setRemoved();
- Iterator iter = info.getBatchIterator();
+ Iterator<ManagedBatch> iter = info.getBatchIterator();
while(iter.hasNext()) {
- ManagedBatch batch = (ManagedBatch) iter.next();
+ ManagedBatch batch = iter.next();
switch(batch.getLocation()) {
case ManagedBatch.UNPINNED:
memoryState.removeUnpinned(batch);
@@ -309,9 +304,7 @@
synchronized (tupleGroupInfo) {
tupleGroupInfo.getTupleSourceIDs().remove(tupleSourceID);
}
- // Remove memory storage
- this.memoryMgr.removeBatches(tupleSourceID);
-
+
// Remove disk storage
if (this.diskMgr != null){
this.diskMgr.removeBatches(tupleSourceID);
@@ -491,12 +484,16 @@
throw new
TupleSourceNotFoundException(QueryExecPlugin.Util.getString("BufferManagerImpl.tuple_source_not_found",
tupleSourceID)); //$NON-NLS-1$
}
+ // Update tuple source state
+ ManagedBatch managedBatch = new ManagedBatch(tupleSourceID,
tupleBatch.getBeginRow(), tupleBatch.getEndRow(), bytes);
+ managedBatch.setLocation(location);
+
// Store into storage manager
try {
if(location == ManagedBatch.PERSISTENT) {
this.diskMgr.addBatch(tupleSourceID, tupleBatch, info.getTypes());
} else {
- this.memoryMgr.addBatch(tupleSourceID, tupleBatch, info.getTypes());
+ managedBatch.setBatch(tupleBatch);
}
} catch(MetaMatrixComponentException e) {
// If we were storing to memory, clean up memory we reserved
@@ -505,11 +502,7 @@
}
throw e;
}
-
- // Update tuple source state
- ManagedBatch managedBatch = new ManagedBatch(tupleSourceID,
tupleBatch.getBeginRow(), tupleBatch.getEndRow(), bytes);
- managedBatch.setLocation(location);
-
+
// Add to memory state if in memory
if(location == ManagedBatch.UNPINNED) {
this.memoryState.addUnpinned(managedBatch);
@@ -565,7 +558,7 @@
} else if(mbatch.getLocation() == ManagedBatch.PINNED) {
// Load batch from memory - already pinned
- memoryBatch = this.memoryMgr.getBatch(tupleSourceID, beginRow,
info.getTypes());
+ memoryBatch = mbatch.getBatch();
} else if(mbatch.getLocation() == ManagedBatch.UNPINNED) {
// Already in memory - just move from unpinned to pinned
@@ -574,7 +567,7 @@
this.memoryState.addPinned(mbatch);
// Load batch from memory
- memoryBatch = this.memoryMgr.getBatch(tupleSourceID, beginRow,
info.getTypes());
+ memoryBatch = mbatch.getBatch();
} else if(mbatch.getLocation() == ManagedBatch.PERSISTENT) {
memoryRequiredByBatch = mbatch.getSize();
@@ -601,26 +594,7 @@
int internalBeginRow = mbatch.getBeginRow();
memoryBatch = diskMgr.getBatch(tupleSourceID, internalBeginRow,
info.getTypes());
- try {
- memoryMgr.addBatch(tupleSourceID, memoryBatch, info.getTypes());
- } catch(MetaMatrixComponentException e) {
- memoryState.releaseMemory(mbatch.getSize(),
info.getGroupInfo());
- throw e;
- }
-
- try {
- diskMgr.removeBatch(tupleSourceID, internalBeginRow);
- } catch(TupleSourceNotFoundException e) {
- } catch(MetaMatrixComponentException e) {
- memoryState.releaseMemory(memoryRequiredByBatch,
info.getGroupInfo());
- try {
- memoryMgr.removeBatch(tupleSourceID, internalBeginRow);
- } catch(Exception e2) {
- // ignore
- }
- throw e;
- }
-
+ mbatch.setBatch(memoryBatch);
mbatch.setLocation(ManagedBatch.PINNED);
this.memoryState.addPinned(mbatch);
}
@@ -736,13 +710,13 @@
long totalMemory = config.getTotalAvailableMemory();
long released = 0;
- Iterator unpinnedIter = this.memoryState.getAllUnpinned();
+ Iterator<ManagedBatch> unpinnedIter = this.memoryState.getAllUnpinned();
while(unpinnedIter.hasNext() && // If there are unpinned batches in
memory, AND
// Defect 14573 - if we require more than what's available, then
cleanup regardless of the threshold
(memoryRequired > totalMemory - memoryState.getMemoryUsed() || // if the
memory needed is more than what's available, or
memoryState.getMemoryUsed() > targetLevel)){ // if we've crossed
the active memory threshold, then cleanup
- ManagedBatch batch = (ManagedBatch) unpinnedIter.next();
+ ManagedBatch batch = unpinnedIter.next();
TupleSourceID tsID = batch.getTupleSourceID();
released += releaseMemory(batch, tsID);
@@ -763,9 +737,9 @@
boolean cleanForSessionSucceeded = false;
long released = 0;
- Iterator unpinnedIter = this.memoryState.getAllUnpinned();
+ Iterator<ManagedBatch> unpinnedIter = this.memoryState.getAllUnpinned();
while(unpinnedIter.hasNext()) {
- ManagedBatch batch = (ManagedBatch) unpinnedIter.next();
+ ManagedBatch batch = unpinnedIter.next();
TupleSourceID tsID = batch.getTupleSourceID();
TupleSourceInfo tsInfo = getTupleSourceInfo(tsID, false);
if(tsInfo == null) {
@@ -828,15 +802,7 @@
}
// This batch is still unpinned - move to persistent storage
- int beginRow = batch.getBeginRow();
- TupleBatch dataBatch = null;
- try {
- dataBatch = memoryMgr.getBatch(tsID, beginRow, info.getTypes());
- } catch(TupleSourceNotFoundException e) {
- return 0;
- } catch(MetaMatrixComponentException e) {
- return 0;
- }
+ TupleBatch dataBatch = batch.getBatch();
try {
diskMgr.addBatch(tsID, dataBatch, info.getTypes());
@@ -845,13 +811,7 @@
return 0;
}
- try {
- memoryMgr.removeBatch(tsID, beginRow);
- } catch(TupleSourceNotFoundException e) {
- // ignore
- } catch(MetaMatrixComponentException e) {
- // ignore
- }
+ batch.setBatch(null);
// Update memory
batch.setLocation(ManagedBatch.PERSISTENT);
@@ -1048,18 +1008,18 @@
* @see com.metamatrix.common.buffer.BufferManager#releasePinnedBatches()
*/
public void releasePinnedBatches() throws MetaMatrixComponentException {
- Map threadPinned = memoryState.getPinnedByCurrentThread();
+ Map<TupleSourceID, Map<Integer, ManagedBatch>> threadPinned =
memoryState.getPinnedByCurrentThread();
if (threadPinned == null) {
return;
}
- for (Iterator i = threadPinned.entrySet().iterator(); i.hasNext();) {
- Map.Entry entry = (Map.Entry)i.next();
+ for (Iterator<Map.Entry<TupleSourceID, Map<Integer,
ManagedBatch>>> i = threadPinned.entrySet().iterator(); i.hasNext();) {
+ Map.Entry<TupleSourceID, Map<Integer, ManagedBatch>> entry =
i.next();
i.remove();
- TupleSourceID tsid = (TupleSourceID)entry.getKey();
- Map pinnedBatches = (Map)entry.getValue();
+ TupleSourceID tsid = entry.getKey();
+ Map<Integer, ManagedBatch> pinnedBatches = entry.getValue();
try {
- for (Iterator j = pinnedBatches.values().iterator(); j.hasNext();) {
- ManagedBatch batch = (ManagedBatch)j.next();
+ for (Iterator<ManagedBatch> j = pinnedBatches.values().iterator();
j.hasNext();) {
+ ManagedBatch batch = j.next();
//TODO: add trace logging about the batch that is being unpinned
unpinTupleBatch(tsid, batch.getBeginRow(), batch.getEndRow());
@@ -1074,7 +1034,7 @@
* for testing purposes
*/
public int getPinnedCount() {
- Map pinned = memoryState.getAllPinned();
+ Map<TupleSourceID, Map<Integer, ManagedBatch>> pinned =
memoryState.getAllPinned();
int count = 0;
@@ -1082,8 +1042,8 @@
return count;
}
- for (Iterator i = pinned.values().iterator(); i.hasNext();) {
- count += ((Map)i.next()).size();
+ for (Iterator<Map<Integer, ManagedBatch>> i =
pinned.values().iterator(); i.hasNext();) {
+ count += i.next().size();
}
return count;
Modified: trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/ManagedBatch.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/ManagedBatch.java 2009-08-11
01:26:05 UTC (rev 1232)
+++
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/ManagedBatch.java 2009-08-11
03:25:47 UTC (rev 1233)
@@ -26,6 +26,7 @@
import java.util.LinkedList;
import java.util.List;
+import com.metamatrix.common.buffer.TupleBatch;
import com.metamatrix.common.buffer.TupleSourceID;
import com.metamatrix.core.util.HashCodeUtil;
@@ -56,12 +57,13 @@
private int location;
private int pinnedCount;
private int hashCode;
+ private TupleBatch batch;
// logging
private static int STACK_LEVELS_TO_OMIT = 2;
private static int STACK_LEVELS_TO_CAPTURE = 5;
- private List whoCalledUs;
+ private List<String> whoCalledUs;
private String sCallStackTimeStamp;
/**
@@ -75,6 +77,14 @@
this.hashCode = HashCodeUtil.hashCode(tupleSourceID.hashCode(), beginRow);
}
+ public TupleBatch getBatch() {
+ return batch;
+ }
+
+ public void setBatch(TupleBatch batch) {
+ this.batch = batch;
+ }
+
/**
* Get the tuple source ID
* @return Tuple sourceID
@@ -147,7 +157,7 @@
*/
StackTraceElement[] elements = new Exception().getStackTrace();
- whoCalledUs = new LinkedList();
+ whoCalledUs = new LinkedList<String>();
for ( int i = STACK_LEVELS_TO_OMIT; i < elements.length && i <
STACK_LEVELS_TO_OMIT + STACK_LEVELS_TO_CAPTURE; i++ ) {
whoCalledUs.add(elements[ i ].toString());
@@ -158,7 +168,7 @@
/**
* Returns call stack
*/
- public List getCallStack() {
+ public List<String> getCallStack() {
return whoCalledUs;
}
Modified:
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleSourceInfo.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleSourceInfo.java 2009-08-11
01:26:05 UTC (rev 1232)
+++
trunk/engine/src/main/java/com/metamatrix/common/buffer/impl/TupleSourceInfo.java 2009-08-11
03:25:47 UTC (rev 1233)
@@ -34,15 +34,6 @@
*/
public class TupleSourceInfo {
- private static final BatchMapValueTranslator TRANSLATOR = new
BatchMapValueTranslator() {
- public int getBeginRow(Object batchMapValue) {
- return ((ManagedBatch)batchMapValue).getBeginRow();
- }
- public int getEndRow(Object batchMapValue) {
- return ((ManagedBatch)batchMapValue).getEndRow();
- }
- };
-
private TupleSourceType type; // Type of TupleSource, as defined in
BufferManager constants
private TupleSourceID tsID;
private List schema;
@@ -51,8 +42,8 @@
private TupleSourceStatus status;
private TupleGroupInfo groupInfo;
private boolean removed = false;
+ private TreeMap<Integer, ManagedBatch> batches = new TreeMap<Integer,
ManagedBatch>();
- private BatchMap batches;
private boolean lobs;
/**
@@ -63,7 +54,6 @@
* @param type Type of tuple source, as defined in BufferManager constants
*/
public TupleSourceInfo(TupleSourceID tsID, List schema, String[] types,
TupleGroupInfo groupInfo, TupleSourceType type) {
-
this.tsID = tsID;
this.schema = schema;
this.types = types;
@@ -71,10 +61,29 @@
this.status = TupleSourceStatus.ACTIVE;
this.rowCount = 0;
this.type = type;
- this.batches = new BatchMap(TRANSLATOR);
this.lobs = checkForLobs();
}
+ public void addBatch(ManagedBatch batch) {
+ batches.put(batch.getBeginRow(), batch);
+ }
+
+ public ManagedBatch getBatch(int beginRow) {
+ Map.Entry<Integer, ManagedBatch> entry = batches.floorEntry(beginRow);
+ if (entry != null && entry.getValue().getEndRow() >= beginRow) {
+ return entry.getValue();
+ }
+ return null;
+ }
+
+ public void removeBatch(int beginRow) {
+ batches.remove(new Integer(beginRow));
+ }
+
+ public Iterator<ManagedBatch> getBatchIterator() {
+ return batches.values().iterator();
+ }
+
/**
* Get the tuple source identifier
* @return Tuple source identifier
@@ -159,47 +168,10 @@
this.removed = true;
}
- /**
- * Add a managed batch to this tuple source
- * @param batch Managed batch for this tuple source
- */
- public void addBatch(ManagedBatch batch) {
- batches.addBatch(batch.getBeginRow(), batch);
- }
-
- /**
- * Remove a managed batch for this tuple source,
- * indexed by beginRow
- * @param beginRow First row of batch to remove
- */
- public void removeBatch(int beginRow) {
- batches.removeBatch(beginRow);
- }
-
- /**
- * Get a batch, specified by beginRow
- * @param beginRow First row of batch to retrieve
- * @return Managed batch or null if not found
- */
- public ManagedBatch getBatch(int beginRow) {
- return (ManagedBatch)batches.getBatch(beginRow);
- }
-
public boolean lobsInSource() {
return this.lobs;
}
- /**
- * Get iterator directly on the batches contained by this
- * tuple source. To use this safely, the user should ensure
- * through locking that no other thread are modifying this
- * TupleSourceInfo while the iterator is being used.
- * @return Unsafe iterator
- */
- public Iterator getBatchIterator() {
- return this.batches.getBatchIterator();
- }
-
public String[] getTypes() {
return types;
}
Modified:
trunk/engine/src/main/java/com/metamatrix/common/buffer/storage/file/FileStorageManager.java
===================================================================
---
trunk/engine/src/main/java/com/metamatrix/common/buffer/storage/file/FileStorageManager.java 2009-08-11
01:26:05 UTC (rev 1232)
+++
trunk/engine/src/main/java/com/metamatrix/common/buffer/storage/file/FileStorageManager.java 2009-08-11
03:25:47 UTC (rev 1233)
@@ -33,8 +33,8 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -72,7 +72,7 @@
private File dirFile;
// State
- private Map tupleSourceMap = new HashMap(); // TupleSourceID ->
TupleSourceInfo
+ private Map<TupleSourceID, TupleSourceInfo> tupleSourceMap = new
HashMap<TupleSourceID, TupleSourceInfo>();
private Map<File, RandomAccessFile> fileCache = Collections.synchronizedMap(new
LinkedHashMap<File, RandomAccessFile>() {
@Override
protected boolean removeEldestEntry(
@@ -119,14 +119,6 @@
}
/**
- * Return file type: {@link com.metamatrix.common.buffer.StorageManager.TYPE_FILE}
- * @return File type constant
- */
- public int getStorageType() {
- return StorageManager.TYPE_FILE;
- }
-
- /**
* Look up tuple source info and possibly create. First the file map is used to find
an
* existing file info. If the info is found it is returned. If not, then
* a TupleSourceInfo is created according to shouldCreate flag
@@ -138,7 +130,7 @@
// Try to find in cache
synchronized(tupleSourceMap) {
- TupleSourceInfo info = (TupleSourceInfo) tupleSourceMap.get(sourceID);
+ TupleSourceInfo info = tupleSourceMap.get(sourceID);
if(info == null && shouldCreate) {
info = new TupleSourceInfo();
tupleSourceMap.put(sourceID, info);
@@ -285,7 +277,7 @@
throw new
TupleSourceNotFoundException(QueryExecPlugin.Util.getString("BufferManagerImpl.tuple_source_not_found",
sourceID)); //$NON-NLS-1$
}
// Find pointer
- PointerInfo pointerInfo = (PointerInfo) info.tupleBatchPointers.get(new
Integer(beginRow));
+ PointerInfo pointerInfo = info.tupleBatchPointers.get(new
Integer(beginRow));
Assertion.isNotNull(pointerInfo);
FileInfo fileInfo = pointerInfo.fileInfo;
@@ -305,9 +297,9 @@
batch.readExternal(ois);
return batch;
} catch(IOException e) {
- throw new MetaMatrixComponentException(e,
QueryExecPlugin.Util.getString("FileStoreageManager.error_reading",
fileInfo.file.getAbsoluteFile()));
+ throw new MetaMatrixComponentException(e,
QueryExecPlugin.Util.getString("FileStoreageManager.error_reading",
fileInfo.file.getAbsoluteFile())); //$NON-NLS-1$
} catch (ClassNotFoundException e) {
- throw new MetaMatrixComponentException(e,
QueryExecPlugin.Util.getString("FileStoreageManager.error_reading",
fileInfo.file.getAbsoluteFile()));
+ throw new MetaMatrixComponentException(e,
QueryExecPlugin.Util.getString("FileStoreageManager.error_reading",
fileInfo.file.getAbsoluteFile())); //$NON-NLS-1$
} finally {
fileInfo.close();
}
@@ -315,19 +307,6 @@
}
/**
- * This method does nothing - rather than deleting batches from the middle of a
RandomAccessFile,
- * which would be very expensive, we just handle the possibility that a batch already
exists
- * in the addBatch method.
- * @param sourceID Source identifier
- * @param beginRow Beginning batch row to remove
- */
- public void removeBatch(TupleSourceID sourceID, int beginRow)
- throws TupleSourceNotFoundException, MetaMatrixComponentException {
-
- // nothing - don't remove batches as it is too expensive
- }
-
- /**
* Remove all batches for a sourceID. Before removal, the file is closed.
* @param sourceID Tuple source ID
*/
@@ -335,7 +314,7 @@
TupleSourceInfo info = null;
// Remove info from the file map
synchronized(tupleSourceMap) {
- info = (TupleSourceInfo)tupleSourceMap.remove(sourceID);
+ info = tupleSourceMap.remove(sourceID);
}
// Didn't find a file
@@ -355,7 +334,7 @@
// If open, close the file and decrement the open file counter
for (int i = 0; i < info.storageFiles.size(); i++) {
- FileInfo fileInfo = (FileInfo)info.storageFiles.get(i);
+ FileInfo fileInfo = info.storageFiles.get(i);
fileInfo.delete();
}
// Delete the file and mark info as being removed
@@ -371,10 +350,12 @@
LogManager.logDetail(LogConstants.CTX_STORAGE_MGR, "Removing all storage area
files "); //$NON-NLS-1$
- Iterator tsIter = tupleSourceMap.keySet().iterator();
-
- while(tsIter.hasNext()) {
- TupleSourceID key = (TupleSourceID)tsIter.next();
+ List<TupleSourceID> ids = null;
+ synchronized (tupleSourceMap) {
+ ids = new LinkedList<TupleSourceID>(tupleSourceMap.keySet());
+ }
+
+ for (TupleSourceID key : ids) {
try {
removeBatches(key);
} catch (MetaMatrixComponentException e) {
@@ -398,10 +379,6 @@
this.file = file;
}
- public boolean isOpen() {
- return fileData != null;
- }
-
public void open() throws FileNotFoundException {
if(this.fileData == null) {
this.fileData = fileCache.remove(this.file);
@@ -454,15 +431,15 @@
}
private static class TupleSourceInfo {
- Map tupleBatchPointers = new HashMap(); // beginRow -> PointerInfo
- List storageFiles = new ArrayList(2); // Stores all the FileInfos for this
tupleSource
+ Map<Integer, PointerInfo> tupleBatchPointers = new HashMap<Integer,
PointerInfo>();
+ List<FileInfo> storageFiles = new ArrayList<FileInfo>(2); // Stores
all the FileInfos for this tupleSource
private boolean isRemoved = false;
FileInfo getMostRecentlyCreatedFile() {
if (storageFiles.isEmpty()) {
return null;
}
- return (FileInfo)storageFiles.get(storageFiles.size() - 1);
+ return storageFiles.get(storageFiles.size() - 1);
}
}
Copied: trunk/engine/src/test/java/com/metamatrix/common/buffer/BufferManagerFactory.java
(from rev 1214,
trunk/engine/src/main/java/com/metamatrix/common/buffer/BufferManagerFactory.java)
===================================================================
--- trunk/engine/src/test/java/com/metamatrix/common/buffer/BufferManagerFactory.java
(rev 0)
+++
trunk/engine/src/test/java/com/metamatrix/common/buffer/BufferManagerFactory.java 2009-08-11
03:25:47 UTC (rev 1233)
@@ -0,0 +1,141 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package com.metamatrix.common.buffer;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import com.metamatrix.api.exception.MetaMatrixComponentException;
+import com.metamatrix.common.buffer.impl.BufferManagerImpl;
+import com.metamatrix.query.execution.QueryExecPlugin;
+
+/**
+ * <p>Factory for BufferManager instances. One method will get
+ * a server buffer manager, as it should be instantiated in a running
+ * MetaMatrix server. That BufferManager is configured mostly by the
+ * passed in properties.</p>
+ *
+ * <p>The other method returns a stand-alone, in-memory buffer manager. This
+ * is typically used for either in-memory testing or any time the
+ * query processor component is not expected to run out of memory, such as
+ * within the modeler.</p>
+ */
+public class BufferManagerFactory {
+
+ public static class MemoryStorageManager implements StorageManager {
+
+ // TupleSourceID -> List<TupleBatch> (ordered by startRow)
+ private Map<TupleSourceID, Map<Integer, TupleBatch>> storage =
Collections.synchronizedMap(new HashMap<TupleSourceID, Map<Integer,
TupleBatch>>());
+
+ /**
+ * @see StorageManager#initialize(Properties)
+ */
+ public void initialize(Properties props) throws MetaMatrixComponentException {
+ }
+
+ /**
+ * @see StorageManager#addBatch(TupleSourceID, TupleBatch)
+ */
+ public void addBatch(TupleSourceID storageID, TupleBatch batch, String[] types)
+ throws MetaMatrixComponentException {
+
+ Map<Integer, TupleBatch> batches = null;
+ synchronized(this.storage) {
+ batches = storage.get(storageID);
+ if(batches == null) {
+ batches = new HashMap<Integer, TupleBatch>();
+ this.storage.put(storageID, batches);
+ }
+ }
+
+ synchronized(batches) {
+ batches.put(batch.getBeginRow(), batch);
+ }
+ }
+
+ /**
+ * @see StorageManager#getBatch(TupleSourceID, int, int)
+ */
+ public TupleBatch getBatch(TupleSourceID storageID, int beginRow, String[] types)
+ throws TupleSourceNotFoundException, MetaMatrixComponentException {
+
+ Map<Integer, TupleBatch> batches = storage.get(storageID);
+
+ if(batches == null) {
+ throw new
TupleSourceNotFoundException(QueryExecPlugin.Util.getString("BufferManagerImpl.tuple_source_not_found",
storageID)); //$NON-NLS-1$
+ }
+
+ synchronized(batches) {
+ TupleBatch batch = batches.get(beginRow);
+ if(batch == null) {
+ throw new MetaMatrixComponentException("unknown batch");
//$NON-NLS-1$
+ }
+ return batch;
+ }
+ }
+
+ /**
+ * @see StorageManager#removeStorageArea(TupleSourceID)
+ */
+ public void removeBatches(TupleSourceID storageID) throws
MetaMatrixComponentException {
+ storage.remove(storageID);
+ }
+
+ /**
+ * @see StorageManager#shutdown()
+ */
+ public void shutdown() {
+ this.storage.clear();
+ this.storage = null;
+ }
+
+ }
+
+ private static BufferManager INSTANCE;
+
+ /**
+ * Helper to get a buffer manager all set up for unmanaged standalone use. This is
+ * typically used for testing or when memory is not an issue.
+ * @return BufferManager ready for use
+ */
+ public static BufferManager getStandaloneBufferManager() throws
MetaMatrixComponentException {
+ if (INSTANCE == null) {
+ BufferManager bufferMgr = new BufferManagerImpl();
+ Properties props = new Properties();
+ props.setProperty(BufferManagerPropertyNames.MEMORY_AVAILABLE,
String.valueOf(Long.MAX_VALUE));
+ props.setProperty(BufferManagerPropertyNames.SESSION_USE_PERCENTAGE,
"100"); //$NON-NLS-1$
+ props.setProperty(BufferManagerPropertyNames.LOG_STATS_INTERVAL, "0");
//$NON-NLS-1$
+ props.setProperty(BufferManagerPropertyNames.MANAGEMENT_INTERVAL,
"0"); //$NON-NLS-1$
+ bufferMgr.initialize("local", props); //$NON-NLS-1$
+
+ // Add unmanaged memory storage manager
+ bufferMgr.setStorageManager(new MemoryStorageManager());
+ INSTANCE = bufferMgr;
+ }
+
+ return INSTANCE;
+ }
+
+}
Modified: trunk/engine/src/test/java/com/metamatrix/common/buffer/impl/TestBatchMap.java
===================================================================
---
trunk/engine/src/test/java/com/metamatrix/common/buffer/impl/TestBatchMap.java 2009-08-11
01:26:05 UTC (rev 1232)
+++
trunk/engine/src/test/java/com/metamatrix/common/buffer/impl/TestBatchMap.java 2009-08-11
03:25:47 UTC (rev 1233)
@@ -28,6 +28,8 @@
import java.util.List;
import java.util.Set;
+import com.metamatrix.common.buffer.TupleSourceID;
+
import junit.framework.TestCase;
@@ -35,14 +37,6 @@
* @since 4.3
*/
public class TestBatchMap extends TestCase{
- private BatchMapValueTranslator batchMapValueTranslator = new
BatchMapValueTranslator() {
- public int getBeginRow(Object batchMapValue) {
- return ((Batch)batchMapValue).getBeginRow();
- }
- public int getEndRow(Object batchMapValue) {
- return ((Batch)batchMapValue).getEndRow();
- }
- };
public TestBatchMap(String arg0) {
super(arg0);
@@ -50,17 +44,16 @@
//no batch
public void testAddAndGetBatch1() {
- BatchMap batches = new BatchMap(batchMapValueTranslator);
- assertTrue(batches.isEmpty());
+ TupleSourceInfo batches = new TupleSourceInfo(null, null, null, null, null);
assertNull(batches.getBatch(1));
assertNull(batches.getBatch(2));
}
//one batch
public void testAddAndGetBatch2() {
- BatchMap batches = new BatchMap(batchMapValueTranslator);
- Batch batch1 = new Batch(1,5);
- batches.addBatch(1, batch1);
+ TupleSourceInfo batches = new TupleSourceInfo(null, null, null, null, null);
+ ManagedBatch batch1 = createManagedBatch(1, 5);
+ batches.addBatch(batch1);
assertEquals(batch1, batches.getBatch(1));
assertEquals(batch1, batches.getBatch(2));
assertEquals(batch1, batches.getBatch(5));
@@ -70,11 +63,11 @@
//two batches
public void testAddAndGetBatch3() {
- BatchMap batches = new BatchMap(batchMapValueTranslator);
- Batch batch1 = new Batch(1,5);
- Batch batch2 = new Batch(6,10);
- batches.addBatch(1, batch1);
- batches.addBatch(6, batch2);
+ TupleSourceInfo batches = new TupleSourceInfo(null, null, null, null, null);
+ ManagedBatch batch1 = createManagedBatch(1, 5);
+ ManagedBatch batch2 = createManagedBatch(6, 10);
+ batches.addBatch(batch1);
+ batches.addBatch(batch2);
assertEquals(batch1, batches.getBatch(1));
assertEquals(batch1, batches.getBatch(2));
assertEquals(batch1, batches.getBatch(5));
@@ -83,22 +76,26 @@
assertEquals(batch2, batches.getBatch(10));
assertNull(batches.getBatch(11));
}
+
+ private ManagedBatch createManagedBatch(int begin, int end) {
+ return new ManagedBatch(new TupleSourceID("x"), begin, end, 0);
//$NON-NLS-1$
+ }
//more batches, no adding order
public void testAddAndGetBatch4() {
- BatchMap batches = new BatchMap(batchMapValueTranslator);
+ TupleSourceInfo batches = new TupleSourceInfo(null, null, null, null, null);
Set batchSet = new HashSet();
List batchList = new ArrayList();
for(int i=1; i<10000;) {
- Batch batch = new Batch(i,i+4);
+ ManagedBatch batch = createManagedBatch(i, i + 4);
batchSet.add(batch);
batchList.add(batch);
i += 5;
}
Iterator iter = batchSet.iterator();
while(iter.hasNext()) {
- Batch next = (Batch)iter.next();
- batches.addBatch(next.getBeginRow(), next);
+ ManagedBatch next = (ManagedBatch)iter.next();
+ batches.addBatch(next);
}
for(int i=1; i<10000;) {
assertEquals(batchList.get(i/5), batches.getBatch(i));
@@ -112,39 +109,23 @@
}
public void testAddAndRemoveBatch() {
- BatchMap batches = new BatchMap(batchMapValueTranslator);
- assertTrue(batches.isEmpty());
- Batch batch1 = new Batch(1,5);
- Batch batch2 = new Batch(6,10);
- batches.addBatch(1, batch1);
- batches.addBatch(2, batch2);
+ TupleSourceInfo batches = new TupleSourceInfo(null, null, null, null, null);
+ ManagedBatch batch1 = createManagedBatch(1, 5);
+ ManagedBatch batch2 = createManagedBatch(6, 10);
+ batches.addBatch(batch1);
+ batches.addBatch(batch2);
assertEquals(batch1, batches.getBatch(1));
batches.removeBatch(1);
assertNull(batches.getBatch(1));
assertNull(batches.getBatch(2));
assertNull(batches.getBatch(5));
- batches.removeBatch(2);
+ batches.removeBatch(6);
assertNull(batches.getBatch(1));
assertNull(batches.getBatch(2));
assertNull(batches.getBatch(5));
assertNull(batches.getBatch(6));
assertNull(batches.getBatch(8));
assertNull(batches.getBatch(10));
- assertTrue(batches.isEmpty());
}
- class Batch{
- private int beginRow;
- private int endRow;
- Batch(int beginRow, int endRow){
- Batch.this.beginRow = beginRow;
- Batch.this.endRow = endRow;
- }
- int getBeginRow() {
- return Batch.this.beginRow;
- }
- int getEndRow() {
- return Batch.this.endRow;
- }
- }
}
Modified:
trunk/engine/src/test/java/com/metamatrix/common/buffer/impl/TestBufferManagerImpl.java
===================================================================
---
trunk/engine/src/test/java/com/metamatrix/common/buffer/impl/TestBufferManagerImpl.java 2009-08-11
01:26:05 UTC (rev 1232)
+++
trunk/engine/src/test/java/com/metamatrix/common/buffer/impl/TestBufferManagerImpl.java 2009-08-11
03:25:47 UTC (rev 1233)
@@ -47,7 +47,7 @@
import com.metamatrix.common.buffer.TupleSourceNotFoundException;
import com.metamatrix.common.buffer.BufferManager.TupleSourceStatus;
import com.metamatrix.common.buffer.BufferManager.TupleSourceType;
-import com.metamatrix.common.buffer.storage.memory.MemoryStorageManager;
+import com.metamatrix.common.buffer.BufferManagerFactory.MemoryStorageManager;
import com.metamatrix.common.lob.ByteLobChunkStream;
import com.metamatrix.common.lob.LobChunk;
import com.metamatrix.common.lob.LobChunkInputStream;
@@ -69,7 +69,7 @@
super(arg0);
}
- public static BufferManager getTestBufferManager(long bytesAvailable, StorageManager
sm1, StorageManager sm2) throws MetaMatrixComponentException {
+ public static BufferManager getTestBufferManager(long bytesAvailable, StorageManager
sm2) throws MetaMatrixComponentException {
// Get the properties for BufferManager
Properties bmProps = new Properties();
bmProps.setProperty(BufferManagerPropertyNames.MEMORY_AVAILABLE, "" +
bytesAvailable); //$NON-NLS-1$
@@ -77,30 +77,19 @@
BufferManager bufferManager = new BufferManagerImpl();
bufferManager.initialize("local", bmProps); //$NON-NLS-1$
- // Add storage managers
- bufferManager.addStorageManager(sm1);
-
if(sm2 != null) {
- bufferManager.addStorageManager(sm2);
+ bufferManager.setStorageManager(sm2);
}
return bufferManager;
}
- private StorageManager createMemoryStorageManager() {
- return new MemoryStorageManager();
- }
-
private StorageManager createFakeDatabaseStorageManager() {
- return new MemoryStorageManager() {
- public int getStorageType() {
- return StorageManager.TYPE_DATABASE;
- }
- };
+ return new MemoryStorageManager();
}
- public void helpTestAddBatches(StorageManager sm1, StorageManager sm2, int
memorySize, int numBatches, int rowsPerBatch) throws MetaMatrixComponentException,
MetaMatrixProcessingException {
- BufferManager mgr = getTestBufferManager(memorySize, sm1, sm2);
+ public void helpTestAddBatches(StorageManager sm2, int memorySize, int numBatches,
int rowsPerBatch) throws MetaMatrixComponentException, MetaMatrixProcessingException {
+ BufferManager mgr = getTestBufferManager(memorySize, sm2);
List expectedRows = new ArrayList();
@@ -136,20 +125,28 @@
// Check batches in sm1 - 3 should fit in 1000 bytes of memory
int batchesInMemory = (int) (memorySize * 1000000 / batchSize);
- for(int b=0; b<batchesInMemory; b++) {
- int begin = (b*rowsPerBatch) + 1;
-
- TupleBatch batch = sm1.getBatch(tsID, begin, null);
- assertNotNull("batch " + (b+1) + " is null ", batch);
//$NON-NLS-1$ //$NON-NLS-2$
- }
+ if (sm2 != null) {
+ for(int b=0; b<batchesInMemory; b++) {
+ int begin = (b*rowsPerBatch) + 1;
+ try {
+ sm2.getBatch(tsID, begin, null);
+ fail("Expected exception"); //$NON-NLS-1$
+ } catch (TupleSourceNotFoundException e) {
+
+ } catch (MetaMatrixComponentException e) {
+
+ }
+ }
+
+ // Check batches in sm2
+ for(int b=batchesInMemory+1; b<numBatches; b++) {
+ int begin = (b*rowsPerBatch) + 1;
+
+ TupleBatch batch = sm2.getBatch(tsID, begin, null);
+ assertNotNull("batch " + (b+1) + " is null ", batch);
//$NON-NLS-1$ //$NON-NLS-2$
+ }
+ }
- // Check batches in sm2
- for(int b=batchesInMemory+1; b<numBatches; b++) {
- int begin = (b*rowsPerBatch) + 1;
-
- TupleBatch batch = sm2.getBatch(tsID, begin, null);
- assertNotNull("batch " + (b+1) + " is null ", batch);
//$NON-NLS-1$ //$NON-NLS-2$
- }
// Check row count
int rowCount = mgr.getRowCount(tsID);
@@ -168,16 +165,14 @@
}
public void testSpanStorage() throws Exception {
- helpTestAddBatches(createMemoryStorageManager(),
- createFakeDatabaseStorageManager(),
+ helpTestAddBatches(createFakeDatabaseStorageManager(),
1,
50,
100);
}
public void testStandalone() throws Exception {
- helpTestAddBatches(createMemoryStorageManager(),
- null,
+ helpTestAddBatches(null,
10,
5,
10);
@@ -218,7 +213,7 @@
}
public void testCreateLobReference() throws Exception {
- final BufferManagerImpl mgr = (BufferManagerImpl)getTestBufferManager(1,
createMemoryStorageManager(), createFakeDatabaseStorageManager());
+ final BufferManagerImpl mgr = (BufferManagerImpl)getTestBufferManager(1,
createFakeDatabaseStorageManager());
XMLType xml1 = new XMLType(new SQLXMLImpl("<foo/>"));
//$NON-NLS-1$
XMLType xml2 = new XMLType(new SQLXMLImpl("<bar/>"));
//$NON-NLS-1$
@@ -257,7 +252,7 @@
}
public void testAddStreamablePart() throws Exception {
- final BufferManager mgr = getTestBufferManager(1, createMemoryStorageManager(),
createFakeDatabaseStorageManager());
+ final BufferManager mgr = getTestBufferManager(1,
createFakeDatabaseStorageManager());
// save the lob
List schema = new ArrayList();
@@ -318,7 +313,7 @@
public void testPinning1() throws Exception {
- BufferManager mgr = getTestBufferManager(1, createMemoryStorageManager(),
createFakeDatabaseStorageManager());
+ BufferManager mgr = getTestBufferManager(1, createFakeDatabaseStorageManager());
List schema = new ArrayList();
schema.add("val"); //$NON-NLS-1$
@@ -349,7 +344,7 @@
}
public void testUnpinOfUnpinnedBatch() throws Exception {
- BufferManager mgr = getTestBufferManager(1, createMemoryStorageManager(),
createFakeDatabaseStorageManager());
+ BufferManager mgr = getTestBufferManager(1, createFakeDatabaseStorageManager());
List schema = new ArrayList();
schema.add("val"); //$NON-NLS-1$
@@ -403,7 +398,7 @@
}
public void testDeadlockOnMultiThreadClean() throws Exception {
- BufferManager mgr = getTestBufferManager(1, createMemoryStorageManager(),
createFakeDatabaseStorageManager());
+ BufferManager mgr = getTestBufferManager(1, createFakeDatabaseStorageManager());
List schema = new ArrayList();
schema.add("val"); //$NON-NLS-1$
@@ -440,7 +435,7 @@
}
public void testSessionMax_Fail() throws Exception {
- BufferManager mgr = getTestBufferManager(1, createMemoryStorageManager(),
createFakeDatabaseStorageManager());
+ BufferManager mgr = getTestBufferManager(1, createFakeDatabaseStorageManager());
TupleSourceID tsID = null;
try {
List schema = new ArrayList();
@@ -481,7 +476,7 @@
* @since 4.3
*/
public void testDefect_18499() throws Exception {
- BufferManager mgr = getTestBufferManager(1, createMemoryStorageManager(),
createFakeDatabaseStorageManager());
+ BufferManager mgr = getTestBufferManager(1, createFakeDatabaseStorageManager());
List schema = new ArrayList();
schema.add("col"); //$NON-NLS-1$
String[] schemaTypes = new String[] {DataTypeManager.DefaultDataTypes.STRING};
@@ -502,7 +497,7 @@
}
public void testDefect18497() throws Exception {
- BufferManager mgr = getTestBufferManager(1, createMemoryStorageManager(),
createFakeDatabaseStorageManager());
+ BufferManager mgr = getTestBufferManager(1, createFakeDatabaseStorageManager());
TupleSourceID tsID = null;
try {
List schema = new ArrayList();
@@ -536,7 +531,7 @@
//two threads do the cleaning at the same time
public void testDefect19325() throws Exception{
- BufferManagerImpl mgr = (BufferManagerImpl)getTestBufferManager(1,
createMemoryStorageManager(), createFakeDatabaseStorageManager());
+ BufferManagerImpl mgr = (BufferManagerImpl)getTestBufferManager(1,
createFakeDatabaseStorageManager());
TupleSourceID tsID = null;
List schema = new ArrayList();
schema.add("val"); //$NON-NLS-1$
@@ -572,7 +567,7 @@
//test many small batches
public void testSmallBatches() throws Exception{
- BufferManager mgr = getTestBufferManager(50, createMemoryStorageManager(),
createFakeDatabaseStorageManager());
+ BufferManager mgr = getTestBufferManager(50,
createFakeDatabaseStorageManager());
TupleSourceID tsID = null;
try {
List schema = new ArrayList();
@@ -606,7 +601,7 @@
//going backward
public void testPinning2() throws Exception {
- BufferManager mgr = getTestBufferManager(1, createMemoryStorageManager(),
createFakeDatabaseStorageManager());
+ BufferManager mgr = getTestBufferManager(1, createFakeDatabaseStorageManager());
List schema = new ArrayList();
schema.add("val"); //$NON-NLS-1$
Modified:
trunk/engine/src/test/java/com/metamatrix/common/buffer/storage/file/TestFileStorageManager.java
===================================================================
---
trunk/engine/src/test/java/com/metamatrix/common/buffer/storage/file/TestFileStorageManager.java 2009-08-11
01:26:05 UTC (rev 1232)
+++
trunk/engine/src/test/java/com/metamatrix/common/buffer/storage/file/TestFileStorageManager.java 2009-08-11
03:25:47 UTC (rev 1233)
@@ -322,7 +322,7 @@
}
}
- public void testAddTwice() {
+ public void testAddTwice() throws Exception {
StorageManager sm = getStorageManager(null);
TupleSourceID tsID = new TupleSourceID("local,1:0"); //$NON-NLS-1$
TupleBatch batch = exampleBatch(1, 20);
@@ -331,14 +331,9 @@
// Add batch
sm.addBatch(tsID, batch, null);
- // Remove batch (does nothing)
- sm.removeBatch(tsID, batch.getBeginRow());
-
// Add batch again
sm.addBatch(tsID, batch, null);
- } catch(MetaMatrixException e) {
- fail("Unexpected exception of type " + e.getClass().getName() +
": " + e.getMessage()); //$NON-NLS-1$ //$NON-NLS-2$
} finally {
try {
sm.removeBatches(tsID);
@@ -527,7 +522,6 @@
// Remove
if(added[batch]) {
//System.out.println(tsID.toString() + ": removing batch
" + batch);
- sm.removeBatch(tsID, batches[batch].getBeginRow());
added[batch] = false;
}
}
Modified:
trunk/engine/src/test/java/com/metamatrix/query/processor/TestVirtualDepJoin.java
===================================================================
---
trunk/engine/src/test/java/com/metamatrix/query/processor/TestVirtualDepJoin.java 2009-08-11
01:26:05 UTC (rev 1232)
+++
trunk/engine/src/test/java/com/metamatrix/query/processor/TestVirtualDepJoin.java 2009-08-11
03:25:47 UTC (rev 1233)
@@ -38,8 +38,8 @@
import com.metamatrix.common.buffer.BufferManager;
import com.metamatrix.common.buffer.BufferManagerPropertyNames;
import com.metamatrix.common.buffer.TupleSourceNotFoundException;
+import com.metamatrix.common.buffer.BufferManagerFactory.MemoryStorageManager;
import com.metamatrix.common.buffer.impl.BufferManagerImpl;
-import com.metamatrix.common.buffer.storage.memory.MemoryStorageManager;
import com.metamatrix.common.types.DataTypeManager;
import com.metamatrix.query.mapping.relational.QueryNode;
import com.metamatrix.query.optimizer.TestOptimizer;
@@ -491,17 +491,16 @@
private BufferManager createCustomBufferMgr(int batchSize) throws
MetaMatrixComponentException {
BufferManager bufferMgr = new BufferManagerImpl();
Properties props = new Properties();
- props.setProperty(BufferManagerPropertyNames.ID_CREATOR,
"com.metamatrix.common.buffer.impl.LongIDCreator"); //$NON-NLS-1$
- props.setProperty(BufferManagerPropertyNames.MEMORY_AVAILABLE, "" +
Long.MAX_VALUE); //$NON-NLS-1$
+ props.setProperty(BufferManagerPropertyNames.MEMORY_AVAILABLE,
String.valueOf(Long.MAX_VALUE));
props.setProperty(BufferManagerPropertyNames.SESSION_USE_PERCENTAGE,
"100"); //$NON-NLS-1$
props.setProperty(BufferManagerPropertyNames.LOG_STATS_INTERVAL, "0");
//$NON-NLS-1$
props.setProperty(BufferManagerPropertyNames.MANAGEMENT_INTERVAL, "0");
//$NON-NLS-1$
- props.setProperty(BufferManagerPropertyNames.PROCESSOR_BATCH_SIZE, "" +
batchSize); //$NON-NLS-1$
- props.setProperty(BufferManagerPropertyNames.CONNECTOR_BATCH_SIZE, "" +
batchSize);//$NON-NLS-1$
+ props.setProperty(BufferManagerPropertyNames.PROCESSOR_BATCH_SIZE,
String.valueOf(batchSize));
+ props.setProperty(BufferManagerPropertyNames.CONNECTOR_BATCH_SIZE,
String.valueOf(batchSize));
bufferMgr.initialize("local", props); //$NON-NLS-1$
// Add unmanaged memory storage manager
- bufferMgr.addStorageManager(new MemoryStorageManager());
+ bufferMgr.setStorageManager(new MemoryStorageManager());
return bufferMgr;
}
Modified:
trunk/engine/src/test/java/com/metamatrix/query/processor/relational/NodeTestUtil.java
===================================================================
---
trunk/engine/src/test/java/com/metamatrix/query/processor/relational/NodeTestUtil.java 2009-08-11
01:26:05 UTC (rev 1232)
+++
trunk/engine/src/test/java/com/metamatrix/query/processor/relational/NodeTestUtil.java 2009-08-11
03:25:47 UTC (rev 1233)
@@ -33,8 +33,8 @@
import com.metamatrix.common.buffer.TupleBatch;
import com.metamatrix.common.buffer.TupleSourceID;
import com.metamatrix.common.buffer.TupleSourceNotFoundException;
+import com.metamatrix.common.buffer.BufferManagerFactory.MemoryStorageManager;
import com.metamatrix.common.buffer.impl.BufferManagerImpl;
-import com.metamatrix.common.buffer.storage.memory.MemoryStorageManager;
import com.metamatrix.core.MetaMatrixRuntimeException;
@@ -45,7 +45,6 @@
static BufferManager getTestBufferManager(long bytesAvailable) {
// Get the properties for BufferManager
Properties bmProps = new Properties();
- bmProps.setProperty(BufferManagerPropertyNames.ID_CREATOR,
"com.metamatrix.common.buffer.impl.LongIDCreator"); //$NON-NLS-1$
bmProps.setProperty(BufferManagerPropertyNames.MEMORY_AVAILABLE, "" +
bytesAvailable); //$NON-NLS-1$
return createBufferManager(bmProps);
}
@@ -54,7 +53,6 @@
// Get the properties for BufferManager
Properties bmProps = new Properties();
- bmProps.setProperty(BufferManagerPropertyNames.ID_CREATOR,
"com.metamatrix.common.buffer.impl.LongIDCreator"); //$NON-NLS-1$
bmProps.setProperty(BufferManagerPropertyNames.MEMORY_AVAILABLE, "" +
bytesAvailable); //$NON-NLS-1$
bmProps.setProperty(BufferManagerPropertyNames.PROCESSOR_BATCH_SIZE, ""
+ procBatchSize); //$NON-NLS-1$
bmProps.setProperty(BufferManagerPropertyNames.CONNECTOR_BATCH_SIZE, ""
+ connectorBatchSize); //$NON-NLS-1$
@@ -65,7 +63,6 @@
// Get the properties for BufferManager
Properties bmProps = new Properties();
- bmProps.setProperty(BufferManagerPropertyNames.ID_CREATOR,
"com.metamatrix.common.buffer.impl.LongIDCreator"); //$NON-NLS-1$
bmProps.setProperty(BufferManagerPropertyNames.MEMORY_AVAILABLE, "" +
bytesAvailable); //$NON-NLS-1$
bmProps.setProperty(BufferManagerPropertyNames.PROCESSOR_BATCH_SIZE, ""
+ procBatchSize); //$NON-NLS-1$
return createBufferManager(bmProps);
@@ -81,22 +78,14 @@
}
// Add storage managers
- bufferManager.addStorageManager(createMemoryStorageManager());
- bufferManager.addStorageManager(createFakeDatabaseStorageManager());
+ bufferManager.setStorageManager(createFakeDatabaseStorageManager());
return bufferManager;
}
- private static StorageManager createMemoryStorageManager() {
- return new MemoryStorageManager();
- }
private static StorageManager createFakeDatabaseStorageManager() {
- return new MemoryStorageManager() {
- public int getStorageType() {
- return StorageManager.TYPE_DATABASE;
- }
- };
+ return new MemoryStorageManager();
}
public static class TestableBufferManagerImpl extends BufferManagerImpl {
Modified:
trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedBufferService.java
===================================================================
---
trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedBufferService.java 2009-08-11
01:26:05 UTC (rev 1232)
+++
trunk/runtime/src/main/java/com/metamatrix/dqp/embedded/services/EmbeddedBufferService.java 2009-08-11
03:25:47 UTC (rev 1233)
@@ -38,7 +38,6 @@
import com.metamatrix.common.buffer.StorageManager;
import com.metamatrix.common.buffer.impl.BufferManagerImpl;
import com.metamatrix.common.buffer.storage.file.FileStorageManager;
-import com.metamatrix.common.buffer.storage.memory.MemoryStorageManager;
import com.metamatrix.dqp.embedded.DQPEmbeddedPlugin;
import com.metamatrix.dqp.service.BufferService;
import com.metamatrix.dqp.service.ConfigurationService;
@@ -102,7 +101,6 @@
// Set up buffer configuration properties
Properties bufferProps = new Properties();
- bufferProps.setProperty(BufferManagerPropertyNames.ID_CREATOR,
DEFAULT_ID_CREATOR);
bufferProps.setProperty(BufferManagerPropertyNames.SESSION_USE_PERCENTAGE,
DEFAULT_SESSION_USE_PERCENTAGE);
bufferProps.setProperty(BufferManagerPropertyNames.LOG_STATS_INTERVAL,
DEFAULT_LOG_STATS_INTERVAL);
bufferProps.setProperty(BufferManagerPropertyNames.MANAGEMENT_INTERVAL,
DEFAULT_MANAGEMENT_INTERVAL);
@@ -126,16 +124,13 @@
fsmProps.setProperty(BufferManagerPropertyNames.MAX_OPEN_FILES,
DEFAULT_MAX_OPEN_FILES);
StorageManager fsm = new FileStorageManager();
fsm.initialize(fsmProps);
- this.bufferMgr.addStorageManager(fsm);
+ this.bufferMgr.setStorageManager(fsm);
// start the file storrage manager in clean state
// wise FileStorageManager is smart enough to clen up after itself
cleanDirectory(bufferDir);
}
- // Add unmanaged memory storage manager
- this.bufferMgr.addStorageManager(new MemoryStorageManager());
-
} catch(MetaMatrixComponentException e) {
throw new ApplicationLifecycleException(e,
DQPEmbeddedPlugin.Util.getString("LocalBufferService.Failed_initializing_buffer_manager._8"));
//$NON-NLS-1$
} catch(IOException e) {
Modified:
trunk/server/src/main/java/com/metamatrix/server/dqp/service/PlatformBufferService.java
===================================================================
---
trunk/server/src/main/java/com/metamatrix/server/dqp/service/PlatformBufferService.java 2009-08-11
01:26:05 UTC (rev 1232)
+++
trunk/server/src/main/java/com/metamatrix/server/dqp/service/PlatformBufferService.java 2009-08-11
03:25:47 UTC (rev 1233)
@@ -33,8 +33,10 @@
import com.metamatrix.common.application.exception.ApplicationInitializationException;
import com.metamatrix.common.application.exception.ApplicationLifecycleException;
import com.metamatrix.common.buffer.BufferManager;
-import com.metamatrix.common.buffer.BufferManagerFactory;
import com.metamatrix.common.buffer.BufferManagerPropertyNames;
+import com.metamatrix.common.buffer.StorageManager;
+import com.metamatrix.common.buffer.impl.BufferManagerImpl;
+import com.metamatrix.common.buffer.storage.file.FileStorageManager;
import com.metamatrix.common.config.api.Host;
import com.metamatrix.common.util.PropertiesUtils;
import com.metamatrix.core.util.FileUtils;
@@ -87,12 +89,33 @@
props.setProperty(BufferManagerPropertyNames.BUFFER_STORAGE_DIRECTORY, dir);
try {
- bufferMgr =
BufferManagerFactory.getServerBufferManager(host.getFullName()+"-"+processName,
props); //$NON-NLS-1$
+ bufferMgr = getServerBufferManager(host.getFullName()+"-"+processName,
props); //$NON-NLS-1$
} catch (MetaMatrixComponentException e) {
throw new ApplicationLifecycleException(e);
}
}
+
+ /**
+ * Helper to get a buffer manager all set up for unmanaged standalone use. This is
+ * typically used for testing or when memory is not an issue.
+ * @param lookup Lookup implementation to use
+ * @param props Configuration properties
+ * @return BufferManager ready for use
+ */
+ public static BufferManager getServerBufferManager(String lookup, Properties props)
throws MetaMatrixComponentException {
+ Properties bmProps = PropertiesUtils.clone(props, false);
+ // Construct buffer manager
+ BufferManager bufferManager = new BufferManagerImpl();
+ bufferManager.initialize(lookup, bmProps);
+ // Get the properties for FileStorageManager and create.
+ StorageManager fsm = new FileStorageManager();
+ fsm.initialize(bmProps);
+ bufferManager.setStorageManager(fsm);
+
+ return bufferManager;
+ }
+
/*
* @see com.metamatrix.common.application.ApplicationService#stop()
*/