teiid SVN: r3577 - in trunk/engine/src: test/java/org/teiid/common/buffer/impl and 1 other directory.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2011-10-24 13:25:36 -0400 (Mon, 24 Oct 2011)
New Revision: 3577
Added:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/PhysicalInfo.java
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockOutputStream.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java
Log:
TEIID-1750 adding better file locking and refining defrag
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockOutputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockOutputStream.java 2011-10-22 11:40:30 UTC (rev 3576)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockOutputStream.java 2011-10-24 17:25:36 UTC (rev 3577)
@@ -29,13 +29,29 @@
ExtensibleBufferedOutputStream {
private final BlockManager blockManager;
int blockNum = -1;
+ private final int maxBlocks;
+ private final boolean allocate;
+
+ static final IOException exceededMax = new IOException();
- BlockOutputStream(BlockManager blockManager) {
+ /**
+ * @param blockManager
+ * @param maxBlocks a max of -1 indicates use existing blocks
+ */
+ BlockOutputStream(BlockManager blockManager, int maxBlocks) {
this.blockManager = blockManager;
+ this.allocate = maxBlocks != -1;
+ this.maxBlocks = maxBlocks - 2; //convert to an index
}
-
+
@Override
- protected ByteBuffer newBuffer() {
+ protected ByteBuffer newBuffer() throws IOException {
+ if (!allocate) {
+ return blockManager.getBlock(++blockNum);
+ }
+ if (blockNum > maxBlocks) {
+ throw exceededMax;
+ }
return blockManager.allocateBlock(++blockNum);
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java 2011-10-22 11:40:30 UTC (rev 3576)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java 2011-10-24 17:25:36 UTC (rev 3577)
@@ -72,11 +72,11 @@
int block = getAndSetNextClearBit(info);
int segment = block/blocksInUse.getBitsPerSegment();
boolean success = false;
- //we're using the read lock here so that defrag can lock the write out
- locks[segment].readLock().lock();
+ this.locks[segment].writeLock().lock();
try {
FileStore fs = stores[segment];
long blockOffset = (block%blocksInUse.getBitsPerSegment())*blockSize;
+ //TODO: there is still an extra buffer being created here, we could FileChannels to do better
byte[] b = new byte[BufferFrontedFileStoreCache.BLOCK_SIZE];
int read = 0;
while ((read = is.read(b, 0, b.length)) != -1) {
@@ -85,7 +85,7 @@
}
success = true;
} finally {
- locks[segment].readLock().unlock();
+ locks[segment].writeLock().unlock();
if (!success) {
blocksInUse.clear(block);
block = BufferFrontedFileStoreCache.EMPTY_ADDRESS;
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2011-10-22 11:40:30 UTC (rev 3576)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2011-10-24 17:25:36 UTC (rev 3577)
@@ -41,10 +41,10 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.teiid.common.buffer.AutoCleanupUtil;
-import org.teiid.common.buffer.BaseCacheEntry;
import org.teiid.common.buffer.Cache;
import org.teiid.common.buffer.CacheEntry;
import org.teiid.common.buffer.CacheKey;
@@ -96,7 +96,7 @@
*/
public class BufferFrontedFileStoreCache implements Cache<PhysicalInfo>, StorageManager {
- private static final int DEFAULT_MIN_DEFRAG = 1 << 23;
+ private static final int DEFAULT_MIN_DEFRAG = 1 << 26;
private static final byte[] HEADER_SKIP_BUFFER = new byte[16];
private static final int EVICTION_SCANS = 5;
@@ -154,8 +154,8 @@
}
private int getOrUpdateDataBlockIndex(int index, int value, Mode mode) {
- if (index >= MAX_DOUBLE_INDIRECT || (mode == Mode.ALLOCATE && index >= maxMemoryBlocks)) {
- throw new TeiidRuntimeException("Max block number exceeded. Increase the maxStorageObjectSize to support larger storage objects. Alternatively you could make the processor batch size smaller."); //$NON-NLS-1$
+ if (index >= MAX_DOUBLE_INDIRECT) {
+ throw new TeiidRuntimeException("Max block number exceeded. You could try making the processor batch size smaller."); //$NON-NLS-1$
}
int dataBlock = 0;
int position = 0;
@@ -372,25 +372,28 @@
continue;
}
try {
+ boolean sleep = false;
do {
- int blockToMove = blockStore.blocksInUse.compactHighestBitSet(segment);
+ if (sleep) {
+ Thread.sleep(100); //let the file activity quite down
+ }
+ sleep = true;
+ int relativeBlockToMove = blockStore.blocksInUse.compactHighestBitSet(segment);
if (!shouldDefrag(blockStore, segment)) {
- //truncate the file
- blockStore.locks[segment].writeLock().lock();
- try {
- int endBlock = blockStore.blocksInUse.getHighestBitSet(segment);
- long length = endBlock * blockStore.blockSize;
- blockStore.stores[segment].setLength(length);
- } finally {
- blockStore.locks[segment].writeLock().unlock();
- }
+ truncate(blockStore, segment);
break;
}
//move the block if possible
- InputStream is = blockStore.stores[segment].createInputStream(blockToMove * blockStore.blockSize);
+ InputStream is = blockStore.stores[segment].createInputStream(relativeBlockToMove * blockStore.blockSize, blockStore.blockSize);
DataInputStream dis = new DataInputStream(is);
- Long gid = dis.readLong();
- Long oid = dis.readLong();
+ Long gid = null;
+ Long oid = null;
+ try {
+ gid = dis.readLong();
+ oid = dis.readLong();
+ } catch (IOException e) {
+ continue; //can happen the bit was set and no data exists
+ }
dis.reset(); //move back to the beginning
Map<Long, PhysicalInfo> map = physicalMapping.get(gid);
if (map == null) {
@@ -400,16 +403,20 @@
if (info == null) {
continue;
}
+ int bitIndex = relativeBlockToMove + (segment * blockStore.blocksInUse.getBitsPerSegment());
synchronized (info) {
- await(info, true, false);
+ info.await(true, false);
if (info.block == EMPTY_ADDRESS) {
continue;
}
- assert info.block == blockToMove;
+ if (info.block != bitIndex) {
+ //we've marked a bit in use, but haven't yet written new data
+ continue;
+ }
}
int newBlock = blockStore.writeToStorageBlock(info, dis);
synchronized (info) {
- await(info, true, true);
+ info.await(true, true);
if (info.block == EMPTY_ADDRESS) {
//already removed;
if (newBlock != EMPTY_ADDRESS) {
@@ -418,11 +425,14 @@
continue;
}
info.block = newBlock;
- blockStore.blocksInUse.clear(blockToMove);
+ blockStore.blocksInUse.clear(bitIndex);
}
+ sleep = false;
} while (shouldDefrag(blockStore, segment));
} catch (IOException e) {
LogManager.logWarning(LogConstants.CTX_BUFFER_MGR, e, "Error performing defrag"); //$NON-NLS-1$
+ } catch (InterruptedException e) {
+ throw new TeiidRuntimeException(e);
}
}
}
@@ -430,6 +440,20 @@
defragRunning.set(false);
}
}
+
+ private void truncate(BlockStore blockStore, int segment) {
+ //truncate the file
+ blockStore.locks[segment].writeLock().lock();
+ try {
+ int endBlock = blockStore.blocksInUse.getHighestBitSet(segment);
+ long newLength = (endBlock + 1) * blockStore.blockSize;
+ blockStore.stores[segment].setLength(newLength);
+ } catch (IOException e) {
+ LogManager.logWarning(LogConstants.CTX_BUFFER_MGR, e, "Error performing defrag truncate"); //$NON-NLS-1$
+ } finally {
+ blockStore.locks[segment].writeLock().unlock();
+ }
+ }
};
private AtomicBoolean cleanerRunning = new AtomicBoolean();
private final Runnable cleaningTask = new Runnable() {
@@ -465,8 +489,8 @@
this.blockByteBuffer = new BlockByteBuffer(30, blocks, LOG_BLOCK_SIZE, direct);
//ensure that we'll run out of blocks first
this.inodeByteBuffer = new BlockByteBuffer(30, blocks+1, LOG_INODE_SIZE, direct);
- memoryWritePermits = new Semaphore(Math.max(1, (int)Math.min((((long)blocks)<<LOG_BLOCK_SIZE)/maxStorageObjectSize, Integer.MAX_VALUE)));
- maxMemoryBlocks = Math.min(MAX_DOUBLE_INDIRECT, maxStorageObjectSize>>LOG_BLOCK_SIZE);
+ memoryWritePermits = new Semaphore(blocks);
+ maxMemoryBlocks = Math.min(MAX_DOUBLE_INDIRECT, blocks);
//try to maintain enough freespace so that writers don't block in cleaning
cleaningThreshold = Math.min(maxMemoryBlocks<<4, blocks>>1);
criticalCleaningThreshold = Math.min(maxMemoryBlocks<<2, blocks>>2);
@@ -480,9 +504,13 @@
}
List<BlockStore> stores = new ArrayList<BlockStore>();
int size = BLOCK_SIZE;
+ int files = 32; //this allows us to have 64 terabytes of smaller block sizes
do {
- stores.add(new BlockStore(this.storageManager, size, 30, BufferManagerImpl.CONCURRENCY_LEVEL));
+ stores.add(new BlockStore(this.storageManager, size, 30, files));
size <<=1;
+ if (files > 1) {
+ files >>= 1;
+ }
} while ((size>>1) < maxStorageObjectSize);
this.sizeBasedStores = stores.toArray(new BlockStore[stores.size()]);
}
@@ -507,6 +535,7 @@
boolean hasPermit = false;
PhysicalInfo info = null;
boolean success = false;
+ int memoryBlocks = this.maxMemoryBlocks;
try {
Map<Long, PhysicalInfo> map = physicalMapping.get(s.getId());
if (map == null) {
@@ -538,26 +567,14 @@
}
info.adding = true;
//second chance re-add to the cache, we assume that serialization would be faster than a disk read
+ memoryBlocks = info.memoryBlockCount;
}
}
- //proactively create freespace
- if (!cleanerRunning.get()) {
- if (lowBlocks(false) && cleanerRunning.compareAndSet(false, true)) {
- LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Starting memory buffer cleaner"); //$NON-NLS-1$
- asynchPool.execute(cleaningTask);
- if (lowBlocks(true)) {
- //do a non-blocking removal before we're forced to block
- evictFromMemoryBuffer(false);
- }
- }
- } else if (lowBlocks(true)) {
- //do a non-blocking removal before we're forced to block
- evictFromMemoryBuffer(false);
- }
- memoryWritePermits.acquire();
+ checkForLowMemory();
+ memoryWritePermits.acquire(memoryBlocks);
hasPermit = true;
blockManager = getBlockManager(s.getId(), entry.getId(), EMPTY_ADDRESS);
- BlockOutputStream bos = new BlockOutputStream(blockManager);
+ BlockOutputStream bos = new BlockOutputStream(blockManager, memoryBlocks);
ObjectOutput dos = new DataObjectOutputStream(bos);
dos.writeLong(s.getId());
dos.writeLong(entry.getId());
@@ -577,15 +594,17 @@
}
}
} catch (Throwable e) {
- if (e == PhysicalInfo.sizeChanged) {
+ if ((e == BlockOutputStream.exceededMax && newEntry) || e == PhysicalInfo.sizeChanged) {
//entries are mutable after adding, the original should be removed shortly so just ignore
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Object "+ entry.getId() +" changed size since first persistence, keeping the original."); //$NON-NLS-1$ //$NON-NLS-2$
+ } else if (e == BlockOutputStream.exceededMax){
+ LogManager.logError(LogConstants.CTX_BUFFER_MGR, "Max block number exceeded. Increase the maxStorageObjectSize to support larger storage objects. Alternatively you could make the processor batch size smaller."); //$NON-NLS-1$
} else {
LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting, attempts to read "+ entry.getId() +" later will result in an exception."); //$NON-NLS-1$ //$NON-NLS-2$
}
} finally {
if (hasPermit) {
- memoryWritePermits.release();
+ memoryWritePermits.release(memoryBlocks);
}
if (info != null) {
synchronized (info) {
@@ -602,6 +621,23 @@
}
return true;
}
+
+ private void checkForLowMemory() {
+ //proactively create freespace
+ if (!cleanerRunning.get()) {
+ if (lowBlocks(false) && cleanerRunning.compareAndSet(false, true)) {
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Starting memory buffer cleaner"); //$NON-NLS-1$
+ asynchPool.execute(cleaningTask);
+ if (lowBlocks(true)) {
+ //do a non-blocking removal before we're forced to block
+ evictFromMemoryBuffer(false);
+ }
+ }
+ } else if (lowBlocks(true)) {
+ //do a non-blocking removal before we're forced to block
+ evictFromMemoryBuffer(false);
+ }
+ }
@Override
public PhysicalInfo lockForLoad(Long oid, Serializer<?> serializer) {
@@ -613,16 +649,7 @@
if (info == null) {
return null;
}
- synchronized (info) {
- while (info.loading) {
- try {
- info.wait();
- } catch (InterruptedException e) {
- throw new TeiidRuntimeException(e);
- }
- }
- info.loading = true;
- }
+ info.lockForLoad();
return info;
}
@@ -631,11 +658,7 @@
if (info == null) {
return;
}
- synchronized (info) {
- assert info.loading;
- info.loading = false;
- info.notifyAll();
- }
+ info.unlockForLoad();
}
@Override
@@ -650,10 +673,12 @@
return null;
}
InputStream is = null;
+ Lock lock = null;
+ int memoryBlocks = 0;
try {
synchronized (info) {
assert !info.pinned && info.loading; //load should be locked
- await(info, true, false); //not necessary, but should make things safer
+ info.await(true, false); //not necessary, but should make things safer
if (info.inode != EMPTY_ADDRESS) {
info.pinned = true;
memoryBufferEntries.touch(info);
@@ -670,13 +695,19 @@
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Getting object at block", info.block, info.sizeIndex, serializer.getId(), oid); //$NON-NLS-1$
}
BlockStore blockStore = sizeBasedStores[info.sizeIndex];
- FileStore fs = blockStore.stores[info.block/blockStore.blocksInUse.getBitsPerSegment()];
+ int segment = info.block/blockStore.blocksInUse.getBitsPerSegment();
+ FileStore fs = blockStore.stores[segment];
long blockOffset = (info.block%blockStore.blocksInUse.getBitsPerSegment())*blockStore.blockSize;
is = fs.createInputStream(blockOffset, info.memoryBlockCount<<LOG_BLOCK_SIZE);
+ lock = blockStore.locks[segment].writeLock();
+ memoryBlocks = info.memoryBlockCount;
} else {
return null;
}
}
+ if (lock != null) {
+ is = readIntoMemory(info, is, lock, memoryBlocks);
+ }
ObjectInput dis = new DataObjectInputStream(is);
dis.readFully(HEADER_SKIP_BUFFER);
int sizeEstimate = dis.readInt();
@@ -686,7 +717,9 @@
throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", oid)); //$NON-NLS-1$
} catch (ClassNotFoundException e) {
throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", oid)); //$NON-NLS-1$
- } finally {
+ } catch (InterruptedException e) {
+ throw new TeiidRuntimeException(e);
+ } finally {
synchronized (info) {
info.pinned = false;
info.notifyAll();
@@ -695,6 +728,56 @@
}
/**
+ * Transfer into memory to release memory/file locks
+ */
+ private InputStream readIntoMemory(PhysicalInfo info, InputStream is,
+ Lock fileLock, int memoryBlocks) throws InterruptedException,
+ IOException {
+ checkForLowMemory();
+ this.memoryWritePermits.acquire(memoryBlocks);
+ BlockManager manager = null;
+ boolean success = false;
+ boolean locked = false;
+ try {
+ manager = getBlockManager(info.gid, info.getId(), EMPTY_ADDRESS);
+ //preallocate the memory area, to ensure we won't exhaust memory while holding
+ //the file lock
+ for (int i = 0; i < memoryBlocks; i++) {
+ manager.allocateBlock(i);
+ }
+
+ fileLock.lock();
+ locked = true;
+ ExtensibleBufferedOutputStream os = new BlockOutputStream(manager, -1);
+ //TODO: there is still an extra buffer being created here, we could FileChannels to do better
+ int b = -1;
+ while ((b = is.read()) != -1) {
+ os.write(b);
+ }
+ fileLock.unlock();
+ locked = false;
+ synchronized (info) {
+ info.inode = manager.getInode();
+ memoryBufferEntries.add(info);
+ is = new BlockInputStream(manager, info.memoryBlockCount);
+ }
+ success = true;
+ } finally {
+ try {
+ if (locked) {
+ fileLock.unlock();
+ }
+ if (!success && manager != null) {
+ manager.free(false);
+ }
+ } finally {
+ this.memoryWritePermits.release(memoryBlocks);
+ }
+ }
+ return is;
+ }
+
+ /**
* Determine if an object should be in the memory buffer.
* Adds are indicated by a current time of 0.
* @param currentTime
@@ -778,7 +861,7 @@
//let any pending finish - it would be nice if we could pre-empt
//since we can save some work, but this should be rare enough
//to just block
- await(info, true, true);
+ info.await(true, true);
info.evicting = true;
} else {
assert info.evicting;
@@ -809,7 +892,7 @@
//it is possible for a read to happen while evicting.
//that's ok, we'll just wait for it to finish
assert info.evicting;
- await(info, true, false);
+ info.await(true, false);
info.evicting = false;
info.notifyAll();
assert bm == null || info.inode != EMPTY_ADDRESS;
@@ -833,7 +916,7 @@
if (!defragRunning.get()
&& shouldDefrag(blockStore, segment)
&& defragRunning.compareAndSet(false, true)) {
- this.asynchPool.execute(defragTask);
+ this.asynchPool.execute(defragTask);
}
info.block = EMPTY_ADDRESS;
}
@@ -849,21 +932,14 @@
boolean shouldDefrag(BlockStore blockStore, int segment) {
int highestBitSet = blockStore.blocksInUse.getHighestBitSet(segment);
int bitsSet = blockStore.blocksInUse.getBitsSet(segment);
- highestBitSet = Math.max(bitsSet, Math.max(1, highestBitSet));
+ highestBitSet = Math.max(bitsSet, Math.max(0, highestBitSet));
+ if (highestBitSet == 0) {
+ return false;
+ }
int freeBlocks = highestBitSet-bitsSet;
return freeBlocks > (highestBitSet>>2) && freeBlocks*blockStore.blockSize > minDefrag;
}
- private void await(PhysicalInfo info, boolean pinned, boolean evicting) {
- while ((pinned && info.pinned) || (evicting && info.evicting)) {
- try {
- info.wait();
- } catch (InterruptedException e) {
- throw new TeiidRuntimeException(e);
- }
- }
- }
-
/**
* Eviction routine. When space is exhausted data blocks are acquired from
* memory entries.
@@ -896,7 +972,7 @@
writeLocked = true;
}
//wait for the read/eviction to be over
- await(info, true, true);
+ info.await(true, true);
if (info.inode == EMPTY_ADDRESS) {
continue;
}
@@ -962,49 +1038,4 @@
this.minDefrag = minDefrag;
}
-}
-
-/**
- * Represents the memory buffer and storage state of an object.
- * It is important to minimize the amount of data held here.
- * Currently should be 48 bytes.
- */
-final class PhysicalInfo extends BaseCacheEntry {
-
- static final Exception sizeChanged = new Exception();
-
- final Long gid;
- //the memory inode and block count
- int inode = BufferFrontedFileStoreCache.EMPTY_ADDRESS;
- int memoryBlockCount;
- //the storage block and BlockStore index
- int block = BufferFrontedFileStoreCache.EMPTY_ADDRESS;
- byte sizeIndex = 0;
- //state flags
- boolean pinned; //indicates that the entry is being read
- boolean evicting; //indicates that the entry will be moved out of the memory buffer
- boolean loading; //used by tier 1 cache to prevent double loads
- boolean adding; //used to prevent double adds
-
- public PhysicalInfo(Long gid, Long id, int inode, int lastAccess) {
- super(new CacheKey(id, lastAccess, 0));
- this.inode = inode;
- this.gid = gid;
- }
-
- public void setSize(int size) throws Exception {
- int newMemoryBlockCount = (size>>BufferFrontedFileStoreCache.LOG_BLOCK_SIZE) + ((size&BufferFrontedFileStoreCache.BLOCK_MASK)>0?1:0);
- if (this.memoryBlockCount != 0) {
- if (newMemoryBlockCount != memoryBlockCount) {
- throw sizeChanged;
- }
- return; //no changes
- }
- this.memoryBlockCount = newMemoryBlockCount;
- while (newMemoryBlockCount > 1) {
- this.sizeIndex++;
- newMemoryBlockCount = (newMemoryBlockCount>>1) + ((newMemoryBlockCount&0x01)==0?0:1);
- }
- }
-
-}
+}
\ No newline at end of file
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java 2011-10-22 11:40:30 UTC (rev 3576)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java 2011-10-24 17:25:36 UTC (rev 3577)
@@ -74,7 +74,7 @@
buf = null;
}
- protected abstract ByteBuffer newBuffer();
+ protected abstract ByteBuffer newBuffer() throws IOException;
/**
* Flush up to i bytes where i is the current position of the buffer
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java 2011-10-22 11:40:30 UTC (rev 3576)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java 2011-10-24 17:25:36 UTC (rev 3577)
@@ -114,6 +114,9 @@
protected synchronized int readWrite(long fileOffset, byte[] b, int offSet,
int length, boolean write) throws IOException {
if (!write) {
+ if (fileInfo == null) {
+ return -1;
+ }
try {
RandomAccessFile fileAccess = fileInfo.open();
fileAccess.seek(fileOffset);
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java 2011-10-22 11:40:30 UTC (rev 3576)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java 2011-10-24 17:25:36 UTC (rev 3577)
@@ -54,7 +54,7 @@
public LrfuEvictionQueue(AtomicLong clock) {
this.clock = clock;
- setCrfLamda(.00005); //smaller values tend to work better since we're using interval bounds
+ setCrfLamda(.1); //smaller values tend to work better since we're using interval bounds
}
public boolean remove(V value) {
Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/PhysicalInfo.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/PhysicalInfo.java (rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/PhysicalInfo.java 2011-10-24 17:25:36 UTC (rev 3577)
@@ -0,0 +1,99 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import org.teiid.common.buffer.BaseCacheEntry;
+import org.teiid.common.buffer.CacheKey;
+import org.teiid.core.TeiidRuntimeException;
+
+/**
+ * Represents the memory buffer and storage state of an object.
+ * It is important to minimize the amount of data held here.
+ * Currently should be 48 bytes.
+ */
+final class PhysicalInfo extends BaseCacheEntry {
+
+ static final Exception sizeChanged = new Exception();
+
+ final Long gid;
+ //the memory inode and block count
+ int inode = BufferFrontedFileStoreCache.EMPTY_ADDRESS;
+ int memoryBlockCount;
+ //the storage block and BlockStore index
+ int block = BufferFrontedFileStoreCache.EMPTY_ADDRESS;
+ byte sizeIndex = 0;
+ //state flags
+ boolean pinned; //indicates that the entry is being read
+ boolean evicting; //indicates that the entry will be moved out of the memory buffer
+ boolean loading; //used by tier 1 cache to prevent double loads
+ boolean adding; //used to prevent double adds
+
+ PhysicalInfo(Long gid, Long id, int inode, int lastAccess) {
+ super(new CacheKey(id, lastAccess, 0));
+ this.inode = inode;
+ this.gid = gid;
+ }
+
+ void setSize(int size) throws Exception {
+ int newMemoryBlockCount = (size>>BufferFrontedFileStoreCache.LOG_BLOCK_SIZE) + ((size&BufferFrontedFileStoreCache.BLOCK_MASK)>0?1:0);
+ if (this.memoryBlockCount != 0) {
+ if (newMemoryBlockCount != memoryBlockCount) {
+ throw sizeChanged;
+ }
+ return; //no changes
+ }
+ this.memoryBlockCount = newMemoryBlockCount;
+ while (newMemoryBlockCount > 1) {
+ this.sizeIndex++;
+ newMemoryBlockCount = (newMemoryBlockCount>>1) + ((newMemoryBlockCount&0x01)==0?0:1);
+ }
+ }
+
+ void await(boolean donePinning, boolean doneEvicting) {
+ while ((donePinning && pinned) || (doneEvicting && evicting)) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new TeiidRuntimeException(e);
+ }
+ }
+ }
+
+ synchronized void lockForLoad() {
+ while (loading) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new TeiidRuntimeException(e);
+ }
+ }
+ loading = true;
+ }
+
+ synchronized void unlockForLoad() {
+ assert loading;
+ loading = false;
+ notifyAll();
+ }
+
+}
Property changes on: trunk/engine/src/main/java/org/teiid/common/buffer/impl/PhysicalInfo.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java 2011-10-22 11:40:30 UTC (rev 3576)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java 2011-10-24 17:25:36 UTC (rev 3577)
@@ -49,7 +49,14 @@
sm.initialize();
return sm;
}
-
+
+ @Test public void testInitialRead() throws Exception {
+ FileStorageManager sm = getStorageManager(null, null);
+ String tsID = "0"; //$NON-NLS-1$
+ FileStore store = sm.createFileStore(tsID);
+ assertEquals(-1, store.read(0, new byte[1], 0, 1));
+ }
+
@Test public void testWrite() throws Exception {
FileStorageManager sm = getStorageManager(null, null);
String tsID = "0"; //$NON-NLS-1$
13 years, 2 months
teiid SVN: r3576 - in trunk: engine/src/main/java/org/teiid/common/buffer/impl and 3 other directories.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2011-10-22 07:40:30 -0400 (Sat, 22 Oct 2011)
New Revision: 3576
Added:
trunk/engine/src/main/java/org/teiid/common/buffer/ExtensibleBufferedInputStream.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectInputStream.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectOutputStream.java
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java
trunk/engine/src/main/java/org/teiid/common/buffer/Serializer.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java
trunk/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java
Log:
TEIID-1750 further refining serialization and adding defrag
Added: trunk/engine/src/main/java/org/teiid/common/buffer/ExtensibleBufferedInputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/ExtensibleBufferedInputStream.java (rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/ExtensibleBufferedInputStream.java 2011-10-22 11:40:30 UTC (rev 3576)
@@ -0,0 +1,69 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+public abstract class ExtensibleBufferedInputStream extends InputStream {
+ ByteBuffer buf;
+
+ @Override
+ public int read() throws IOException {
+ if (!ensureBytes()) {
+ return -1;
+ }
+ return buf.get() & 0xff;
+ }
+
+ private boolean ensureBytes() throws IOException {
+ if (buf == null || buf.remaining() == 0) {
+ buf = nextBuffer();
+ if (buf == null) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ protected abstract ByteBuffer nextBuffer() throws IOException;
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (!ensureBytes()) {
+ return -1;
+ }
+ len = Math.min(len, buf.remaining());
+ buf.get(b, off, len);
+ return len;
+ }
+
+ @Override
+ public void reset() throws IOException {
+ if (buf != null) {
+ buf.rewind();
+ }
+ }
+
+}
\ No newline at end of file
Property changes on: trunk/engine/src/main/java/org/teiid/common/buffer/ExtensibleBufferedInputStream.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java 2011-10-21 20:18:54 UTC (rev 3575)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java 2011-10-22 11:40:30 UTC (rev 3576)
@@ -25,6 +25,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -189,36 +190,31 @@
protected abstract void removeDirect();
public InputStream createInputStream(final long start, final long length) {
- return new InputStream() {
+ return new ExtensibleBufferedInputStream() {
private long offset = start;
private long streamLength = length;
+ private ByteBuffer bb = ByteBuffer.allocate(1<<13);
@Override
- public int read() throws IOException {
- byte[] buffer = new byte[1];
- int read = read(buffer, 0, 1);
- if (read == -1) {
- return -1;
- }
- return buffer[0];
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException {
+ protected ByteBuffer nextBuffer() throws IOException {
+ int len = bb.capacity();
if (this.streamLength != -1 && len > this.streamLength) {
len = (int)this.streamLength;
}
if (this.streamLength == -1 || this.streamLength > 0) {
- int bytes = FileStore.this.read(offset, b, off, len);
- if (bytes != -1) {
- this.offset += bytes;
- if (this.streamLength != -1) {
- this.streamLength -= bytes;
- }
+ int bytes = FileStore.this.read(offset, bb.array(), 0, len);
+ if (bytes == -1) {
+ return null;
}
- return bytes;
+ bb.rewind();
+ bb.limit(bytes);
+ this.offset += bytes;
+ if (this.streamLength != -1) {
+ this.streamLength -= bytes;
+ }
+ return bb;
}
- return -1;
+ return null;
}
};
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java 2011-10-21 20:18:54 UTC (rev 3575)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/FileStoreInputStreamFactory.java 2011-10-22 11:40:30 UTC (rev 3576)
@@ -22,7 +22,6 @@
package org.teiid.common.buffer;
-import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -50,8 +49,7 @@
if (fsos != null && !fsos.bytesWritten()) {
return new ByteArrayInputStream(fsos.getBuffer(), 0, fsos.getCount());
}
- //TODO: adjust the buffer size, and/or develop a shared buffer strategy
- return new BufferedInputStream(lobBuffer.createInputStream(0));
+ return lobBuffer.createInputStream(0);
}
@Override
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/Serializer.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/Serializer.java 2011-10-21 20:18:54 UTC (rev 3575)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/Serializer.java 2011-10-22 11:40:30 UTC (rev 3576)
@@ -23,16 +23,16 @@
package org.teiid.common.buffer;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
/**
* Responsible for serializing {@link CacheEntry}s
* @param <T>
*/
public interface Serializer<T> {
- void serialize(T obj, ObjectOutputStream oos) throws IOException;
- T deserialize(ObjectInputStream ois) throws IOException, ClassNotFoundException;
+ void serialize(T obj, ObjectOutput oos) throws IOException;
+ T deserialize(ObjectInput ois) throws IOException, ClassNotFoundException;
boolean useSoftCache();
Long getId();
}
\ No newline at end of file
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java 2011-10-21 20:18:54 UTC (rev 3575)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java 2011-10-22 11:40:30 UTC (rev 3576)
@@ -22,18 +22,17 @@
package org.teiid.common.buffer.impl;
-import java.io.InputStream;
import java.nio.ByteBuffer;
+import org.teiid.common.buffer.ExtensibleBufferedInputStream;
+
/**
* TODO: support freeing of datablocks as we go
*/
-final class BlockInputStream extends InputStream {
+final class BlockInputStream extends ExtensibleBufferedInputStream {
private final BlockManager manager;
private final int maxBlock;
int blockIndex;
- ByteBuffer buf;
- boolean done;
BlockInputStream(BlockManager manager, int blockCount) {
this.manager = manager;
@@ -41,33 +40,11 @@
}
@Override
- public int read() {
- ensureBytes();
- if (done) {
- return -1;
+ protected ByteBuffer nextBuffer() {
+ if (maxBlock == blockIndex) {
+ return null;
}
- return buf.get() & 0xff;
+ return manager.getBlock(blockIndex++);
}
-
- private void ensureBytes() {
- if (buf == null || buf.remaining() == 0) {
- if (maxBlock == blockIndex) {
- done = true;
- return;
- }
- buf = manager.getBlock(blockIndex++);
- }
- }
-
- @Override
- public int read(byte[] b, int off, int len) {
- ensureBytes();
- if (done) {
- return -1;
- }
- len = Math.min(len, buf.remaining());
- buf.get(b, off, len);
- return len;
- }
}
\ No newline at end of file
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java 2011-10-21 20:18:54 UTC (rev 3575)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockStore.java 2011-10-22 11:40:30 UTC (rev 3576)
@@ -22,8 +22,16 @@
package org.teiid.common.buffer.impl;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import org.teiid.common.buffer.FileStore;
import org.teiid.common.buffer.StorageManager;
+import org.teiid.core.TeiidRuntimeException;
+import org.teiid.logging.LogConstants;
+import org.teiid.logging.LogManager;
+import org.teiid.logging.MessageLevel;
/**
* Represents a FileStore that holds blocks of a fixed size.
@@ -32,6 +40,7 @@
final long blockSize;
final ConcurrentBitSet blocksInUse;
final FileStore[] stores;
+ final ReentrantReadWriteLock[] locks;
public BlockStore(StorageManager storageManager, int blockSize, int blockCountLog, int concurrencyLevel) {
this.blockSize = blockSize;
@@ -39,9 +48,50 @@
this.blocksInUse = new ConcurrentBitSet(blockCount, concurrencyLevel);
this.blocksInUse.setCompact(true);
this.stores = new FileStore[concurrencyLevel];
+ this.locks = new ReentrantReadWriteLock[concurrencyLevel];
for (int i = 0; i < stores.length; i++) {
- this.stores[i] = storageManager.createFileStore(String.valueOf(blockSize) + '_' + i);
+ this.stores[i] = storageManager.createFileStore(String.valueOf(blockSize) + '_' + i);
+ this.locks[i] = new ReentrantReadWriteLock();
}
+
}
+
+ int getAndSetNextClearBit(PhysicalInfo info) {
+ int result = blocksInUse.getAndSetNextClearBit();
+ if (result == -1) {
+ throw new TeiidRuntimeException("Out of blocks of size " + blockSize); //$NON-NLS-1$
+ }
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Allocating storage data block", result, "of size", blockSize, "to", info); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+ }
+ return result;
+ }
+
+ int writeToStorageBlock(PhysicalInfo info,
+ InputStream is) throws IOException {
+ int block = getAndSetNextClearBit(info);
+ int segment = block/blocksInUse.getBitsPerSegment();
+ boolean success = false;
+ //we're using the read lock here so that defrag can lock the write out
+ locks[segment].readLock().lock();
+ try {
+ FileStore fs = stores[segment];
+ long blockOffset = (block%blocksInUse.getBitsPerSegment())*blockSize;
+ byte[] b = new byte[BufferFrontedFileStoreCache.BLOCK_SIZE];
+ int read = 0;
+ while ((read = is.read(b, 0, b.length)) != -1) {
+ fs.write(blockOffset, b, 0, read);
+ blockOffset+=read;
+ }
+ success = true;
+ } finally {
+ locks[segment].readLock().unlock();
+ if (!success) {
+ blocksInUse.clear(block);
+ block = BufferFrontedFileStoreCache.EMPTY_ADDRESS;
+ }
+ }
+ return block;
+ }
}
\ No newline at end of file
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2011-10-21 20:18:54 UTC (rev 3575)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2011-10-22 11:40:30 UTC (rev 3576)
@@ -22,10 +22,11 @@
package org.teiid.common.buffer.impl;
+import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -89,17 +90,14 @@
* The root directory "physicalMapping" is held in memory for performance. It will grow in
* proportion to the number of tables/tuplebuffers in use.
*
- * TODO: compact tail storage blocks. there may be dangling blocks causing us to consume disk space.
- * we should at least reclaim tail space if the end block is removed. for now we are just relying
- * on the compact option of {@link ConcurrentBitSet} to keep the blocks at the start of the
- * files.
- *
* The locking is as fine grained as possible to prevent contention. See {@link PhysicalInfo} for
* flags that are used when it is used as a lock. It is important to not access the
* group maps when a {@link PhysicalInfo} lock is held.
*/
public class BufferFrontedFileStoreCache implements Cache<PhysicalInfo>, StorageManager {
+ private static final int DEFAULT_MIN_DEFRAG = 1 << 23;
+ private static final byte[] HEADER_SKIP_BUFFER = new byte[16];
private static final int EVICTION_SCANS = 5;
public static final int DEFAuLT_MAX_OBJECT_SIZE = 1 << 23;
@@ -355,9 +353,85 @@
//root directory
private ConcurrentHashMap<Long, Map<Long, PhysicalInfo>> physicalMapping = new ConcurrentHashMap<Long, Map<Long, PhysicalInfo>>(16, .75f, BufferManagerImpl.CONCURRENCY_LEVEL);
private BlockStore[] sizeBasedStores;
-
+
+ private ExecutorService asynchPool = ExecutorUtils.newFixedThreadPool(2, "FileStore Worker"); //$NON-NLS-1$
+ private AtomicBoolean defragRunning = new AtomicBoolean();
+ //defrag to release freespace held by storage files
+ private final Runnable defragTask = new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Running defrag"); //$NON-NLS-1$
+ }
+ for (int i = 0; i < sizeBasedStores.length; i++) {
+ BlockStore blockStore = sizeBasedStores[i];
+ for (int segment = 0; segment < blockStore.stores.length; segment++) {
+ if (!shouldDefrag(blockStore, segment)) {
+ continue;
+ }
+ try {
+ do {
+ int blockToMove = blockStore.blocksInUse.compactHighestBitSet(segment);
+ if (!shouldDefrag(blockStore, segment)) {
+ //truncate the file
+ blockStore.locks[segment].writeLock().lock();
+ try {
+ int endBlock = blockStore.blocksInUse.getHighestBitSet(segment);
+ long length = endBlock * blockStore.blockSize;
+ blockStore.stores[segment].setLength(length);
+ } finally {
+ blockStore.locks[segment].writeLock().unlock();
+ }
+ break;
+ }
+ //move the block if possible
+ InputStream is = blockStore.stores[segment].createInputStream(blockToMove * blockStore.blockSize);
+ DataInputStream dis = new DataInputStream(is);
+ Long gid = dis.readLong();
+ Long oid = dis.readLong();
+ dis.reset(); //move back to the beginning
+ Map<Long, PhysicalInfo> map = physicalMapping.get(gid);
+ if (map == null) {
+ continue;
+ }
+ PhysicalInfo info = map.get(oid);
+ if (info == null) {
+ continue;
+ }
+ synchronized (info) {
+ await(info, true, false);
+ if (info.block == EMPTY_ADDRESS) {
+ continue;
+ }
+ assert info.block == blockToMove;
+ }
+ int newBlock = blockStore.writeToStorageBlock(info, dis);
+ synchronized (info) {
+ await(info, true, true);
+ if (info.block == EMPTY_ADDRESS) {
+ //already removed;
+ if (newBlock != EMPTY_ADDRESS) {
+ blockStore.blocksInUse.clear(newBlock);
+ }
+ continue;
+ }
+ info.block = newBlock;
+ blockStore.blocksInUse.clear(blockToMove);
+ }
+ } while (shouldDefrag(blockStore, segment));
+ } catch (IOException e) {
+ LogManager.logWarning(LogConstants.CTX_BUFFER_MGR, e, "Error performing defrag"); //$NON-NLS-1$
+ }
+ }
+ }
+ } finally {
+ defragRunning.set(false);
+ }
+ }
+ };
private AtomicBoolean cleanerRunning = new AtomicBoolean();
- private ExecutorService asynchPool = ExecutorUtils.newFixedThreadPool(1, "FileStore Worker"); //$NON-NLS-1$
private final Runnable cleaningTask = new Runnable() {
@Override
@@ -379,6 +453,8 @@
private AtomicLong storageWrites = new AtomicLong();
private AtomicLong storageReads = new AtomicLong();
+ private long minDefrag = DEFAULT_MIN_DEFRAG;
+
@Override
public void initialize() throws TeiidComponentException {
storageManager.initialize();
@@ -482,10 +558,12 @@
hasPermit = true;
blockManager = getBlockManager(s.getId(), entry.getId(), EMPTY_ADDRESS);
BlockOutputStream bos = new BlockOutputStream(blockManager);
- ObjectOutputStream oos = new ObjectOutputStream(bos);
- oos.writeInt(entry.getSizeEstimate());
- s.serialize(entry.getObject(), oos);
- oos.close();
+ ObjectOutput dos = new DataObjectOutputStream(bos);
+ dos.writeLong(s.getId());
+ dos.writeLong(entry.getId());
+ dos.writeInt(entry.getSizeEstimate());
+ s.serialize(entry.getObject(), dos);
+ dos.close();
//synchronized to ensure proper cleanup from a concurrent removal
synchronized (map) {
if (physicalMapping.containsKey(s.getId()) && map.containsKey(entry.getId())) {
@@ -585,6 +663,7 @@
BlockManager manager = getBlockManager(serializer.getId(), oid, info.inode);
is = new BlockInputStream(manager, info.memoryBlockCount);
} else if (info.block != EMPTY_ADDRESS) {
+ info.pinned = true;
memoryBufferEntries.recordAccess(info);
storageReads.incrementAndGet();
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
@@ -598,8 +677,10 @@
return null;
}
}
- ObjectInputStream ois = new ObjectInputStream(is);
- CacheEntry ce = new CacheEntry(new CacheKey(oid, 1, 1), ois.readInt(), serializer.deserialize(ois), ref, true);
+ ObjectInput dis = new DataObjectInputStream(is);
+ dis.readFully(HEADER_SKIP_BUFFER);
+ int sizeEstimate = dis.readInt();
+ CacheEntry ce = new CacheEntry(new CacheKey(oid, 1, 1), sizeEstimate, serializer.deserialize(dis), ref, true);
return ce;
} catch(IOException e) {
throw new TeiidComponentException(e, QueryPlugin.Util.getString("FileStoreageManager.error_reading", oid)); //$NON-NLS-1$
@@ -718,26 +799,10 @@
storageWrites.getAndIncrement();
BlockInputStream is = new BlockInputStream(bm, memoryBlockCount);
BlockStore blockStore = sizeBasedStores[sizeIndex];
- block = getAndSetNextClearBit(blockStore);
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
- LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Allocating storage data block", info.block, "of size", blockStore.blockSize, "to", info); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
- }
- FileStore fs = blockStore.stores[block/blockStore.blocksInUse.getBitsPerSegment()];
- long blockOffset = (block%blockStore.blocksInUse.getBitsPerSegment())*blockStore.blockSize;
- byte[] b = new byte[BLOCK_SIZE];
- int read = 0;
- try {
- while ((read = is.read(b, 0, b.length)) != -1) {
- fs.write(blockOffset, b, 0, read);
- blockOffset+=read;
- }
- } catch (Throwable e) {
- //shouldn't happen, but we'll invalidate this write and continue
- demote = false;
- //just continue to free
- LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error transferring block to storage " + oid); //$NON-NLS-1$
- }
+ block = blockStore.writeToStorageBlock(info, is);
}
+ } catch (IOException e) {
+ LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error transferring block to storage " + oid); //$NON-NLS-1$
} finally {
//ensure post conditions
synchronized (info) {
@@ -764,6 +829,12 @@
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Freed storage data block", info.block, "of size", blockStore.blockSize); //$NON-NLS-1$ //$NON-NLS-2$
}
+ int segment = info.block/blockStore.blocksInUse.getBitsPerSegment();
+ if (!defragRunning.get()
+ && shouldDefrag(blockStore, segment)
+ && defragRunning.compareAndSet(false, true)) {
+ this.asynchPool.execute(defragTask);
+ }
info.block = EMPTY_ADDRESS;
}
}
@@ -775,6 +846,14 @@
return result;
}
+ boolean shouldDefrag(BlockStore blockStore, int segment) {
+ int highestBitSet = blockStore.blocksInUse.getHighestBitSet(segment);
+ int bitsSet = blockStore.blocksInUse.getBitsSet(segment);
+ highestBitSet = Math.max(bitsSet, Math.max(1, highestBitSet));
+ int freeBlocks = highestBitSet-bitsSet;
+ return freeBlocks > (highestBitSet>>2) && freeBlocks*blockStore.blockSize > minDefrag;
+ }
+
private void await(PhysicalInfo info, boolean pinned, boolean evicting) {
while ((pinned && info.pinned) || (evicting && info.evicting)) {
try {
@@ -785,14 +864,6 @@
}
}
- static int getAndSetNextClearBit(BlockStore bs) {
- int result = bs.blocksInUse.getAndSetNextClearBit();
- if (result == -1) {
- throw new TeiidRuntimeException("Out of blocks of size " + bs.blockSize); //$NON-NLS-1$
- }
- return result;
- }
-
/**
* Eviction routine. When space is exhausted data blocks are acquired from
* memory entries.
@@ -886,6 +957,10 @@
public long getMemoryBufferSpace() {
return memoryBufferSpace;
}
+
+ public void setMinDefrag(long minDefrag) {
+ this.minDefrag = minDefrag;
+ }
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-10-21 20:18:54 UTC (rev 3575)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-10-22 11:40:30 UTC (rev 3576)
@@ -23,8 +23,8 @@
package org.teiid.common.buffer.impl;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
@@ -81,6 +81,8 @@
*
* TODO: add detection of pinned batches to prevent unnecessary purging of non-persistent batches
* - this is not necessary for already persistent batches, since we hold a weak reference
+ *
+ * TODO: add a pre-fetch for tuplebuffers or some built-in correlation logic with the queue.
*/
public class BufferManagerImpl implements BufferManager, StorageManager {
@@ -214,7 +216,7 @@
}
@Override
- public List<? extends List<?>> deserialize(ObjectInputStream ois)
+ public List<? extends List<?>> deserialize(ObjectInput ois)
throws IOException, ClassNotFoundException {
List<? extends List<?>> batch = BatchSerializer.readBatch(ois, types);
if (lobManager != null) {
@@ -231,7 +233,7 @@
@Override
public void serialize(List<? extends List<?>> obj,
- ObjectOutputStream oos) throws IOException {
+ ObjectOutput oos) throws IOException {
int expectedModCount = 0;
ResizingArrayList<?> list = null;
if (obj instanceof ResizingArrayList<?>) {
@@ -626,7 +628,7 @@
} else {
waitCount >>= 1;
}
- int result = noWaitReserve(additional - committed);
+ int result = noWaitReserve(additional - committed, false);
committed += result;
}
return committed;
@@ -645,16 +647,19 @@
if (mode == BufferReserveMode.FORCE) {
this.reserveBatchKB.addAndGet(-count);
} else {
- result = noWaitReserve(count);
+ result = noWaitReserve(count, true);
}
reservedByThread.set(reservedByThread.get() + result);
persistBatchReferences();
return result;
}
- private int noWaitReserve(int count) {
+ private int noWaitReserve(int count, boolean allOrNothing) {
for (int i = 0; i < 2; i++) {
int reserveBatch = this.reserveBatchKB.get();
+ if (allOrNothing && count > reserveBatch) {
+ return 0;
+ }
count = Math.min(count, Math.max(0, reserveBatch));
if (count == 0) {
return 0;
@@ -828,9 +833,12 @@
void addMemoryEntry(CacheEntry ce, boolean initial) {
persistBatchReferences();
synchronized (ce) {
- memoryEntries.put(ce.getId(), ce);
+ boolean added = memoryEntries.put(ce.getId(), ce) == null;
if (initial) {
evictionQueue.add(ce);
+ } else if (added) {
+ evictionQueue.recordAccess(ce);
+ evictionQueue.add(ce);
} else {
evictionQueue.touch(ce);
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java 2011-10-21 20:18:54 UTC (rev 3575)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java 2011-10-22 11:40:30 UTC (rev 3576)
@@ -33,6 +33,7 @@
*/
public class ConcurrentBitSet {
+ private static final int CONCURRENT_MODIFICATION = -2;
private static final int ADDRESS_BITS_PER_TOP_VALUE = 18;
private static final int MAX_TOP_VALUE = 1 << ADDRESS_BITS_PER_TOP_VALUE;
@@ -40,6 +41,7 @@
int offset;
int maxBits;
int startSearch;
+ int highestBitSet = -1;
int bitsSet;
int[] topVals;
final BitSet bitSet;
@@ -58,6 +60,10 @@
private Segment[] segments;
private boolean compact;
+ /**
+ * @param maxBits
+ * @param concurrencyLevel - should be a power of 2
+ */
public ConcurrentBitSet(int maxBits, int concurrencyLevel) {
Assertion.assertTrue(maxBits > 0);
while ((bitsPerSegment = maxBits/concurrencyLevel) < concurrencyLevel) {
@@ -108,10 +114,34 @@
return counter.getAndIncrement();
}
- public int getAndSetNextClearBit(int start) {
+ /**
+ * return an estimate of the number of bits set
+ * @param segment
+ * @return
+ */
+ public int getBitsSet(int segment) {
+ Segment s = segments[segment&(segments.length-1)];
+ return s.bitsSet;
+ }
+
+ /**
+ * return an estimate of the highest bit (relative index) that has been set
+ * @param segment
+ * @return
+ */
+ public int getHighestBitSet(int segment) {
+ Segment s = segments[segment&(segments.length-1)];
+ return s.highestBitSet;
+ }
+
+ /**
+ * @param segment
+ * @return the next clear bit index as an absolute index - not relative to a segment
+ */
+ public int getAndSetNextClearBit(int segment) {
int nextBit = -1;
for (int i = 0; i < segments.length; i++) {
- Segment s = segments[(start+i)&(segments.length-1)];
+ Segment s = segments[(segment+i)&(segments.length-1)];
synchronized (s) {
if (s.bitsSet == s.maxBits) {
continue;
@@ -122,7 +152,7 @@
continue;
}
if (s.topVals[j] == 0) {
- if (j == start) {
+ if (j == segment) {
nextBit = s.startSearch;
break;
}
@@ -148,6 +178,7 @@
s.bitsSet++;
s.bitSet.set(nextBit);
s.startSearch = nextBit + 1;
+ s.highestBitSet = Math.max(s.highestBitSet, nextBit);
if (s.startSearch == s.maxBits) {
s.startSearch = 0;
}
@@ -187,4 +218,84 @@
this.compact = compact;
}
+
+ public int compactHighestBitSet(int segment) {
+ Segment s = segments[segment&(segments.length-1)];
+ //first do an unlocked compact
+ for (int i = 0; i < 3; i++) {
+ int result = tryCompactHighestBitSet(s);
+ if (result != CONCURRENT_MODIFICATION) {
+ return result;
+ }
+ }
+ synchronized (s) {
+ return tryCompactHighestBitSet(s);
+ }
+ }
+
+ private int tryCompactHighestBitSet(Segment s) {
+ int highestBitSet = 0;
+ synchronized (s) {
+ highestBitSet = s.highestBitSet;
+ if (highestBitSet <= 0) {
+ return 0;
+ }
+ if (s.bitSet.get(highestBitSet)) {
+ return highestBitSet;
+ }
+ }
+ int indexSearchStart = highestBitSet >> ADDRESS_BITS_PER_TOP_VALUE;
+ for (int j = indexSearchStart; j >= 0; j--) {
+ if (s.topVals[j] == 0) {
+ if (j==0) {
+ synchronized (s) {
+ if (s.highestBitSet != highestBitSet) {
+ return CONCURRENT_MODIFICATION;
+ }
+ s.highestBitSet = -1;
+ }
+ }
+ continue;
+ }
+ if (s.topVals[j] == MAX_TOP_VALUE) {
+ synchronized (s) {
+ if (s.highestBitSet != highestBitSet) {
+ return CONCURRENT_MODIFICATION;
+ }
+ s.highestBitSet = ((j + 1) * MAX_TOP_VALUE) -1;
+ }
+ break;
+ }
+ int index = j * MAX_TOP_VALUE;
+ int end = index + s.maxBits;
+ if (j == indexSearchStart) {
+ end = highestBitSet;
+ }
+ BitSet bs = s.bitSet;
+ int offset = 0;
+ if (j == indexSearchStart) {
+ bs = s.bitSet.get(index, end); //ensures that we look only at a subset of the words
+ offset = index;
+ }
+ index = index - offset;
+ end = end - offset - 1;
+ while (index < end) {
+ int next = bs.nextSetBit(index);
+ if (next == -1) {
+ index--;
+ break;
+ }
+ index = next + 1;
+ }
+ synchronized (s) {
+ if (s.highestBitSet != highestBitSet) {
+ return CONCURRENT_MODIFICATION;
+ }
+ s.highestBitSet = index + offset;
+ return s.highestBitSet;
+ }
+ }
+ return -1;
+ }
+
}
Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectInputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectInputStream.java (rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectInputStream.java 2011-10-22 11:40:30 UTC (rev 3576)
@@ -0,0 +1,105 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectInputStream;
+import java.io.ObjectStreamClass;
+import java.io.StreamCorruptedException;
+
+public class DataObjectInputStream extends DataInputStream implements ObjectInput {
+
+ ObjectInput ois;
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+
+ public DataObjectInputStream(InputStream in) {
+ super(in);
+ }
+
+ @Override
+ public Object readObject() throws ClassNotFoundException, IOException {
+ if (ois == null) {
+ ois = new ObjectInputStream(this) {
+
+ @Override
+ protected void readStreamHeader() throws IOException,
+ StreamCorruptedException {
+ int version = readByte() & 0xFF;
+ if (version != STREAM_VERSION) {
+ throw new StreamCorruptedException("Unsupported version: " + version); //$NON-NLS-1$
+ }
+ }
+
+ @Override
+ protected ObjectStreamClass readClassDescriptor()
+ throws IOException, ClassNotFoundException {
+ int type = read();
+ if (type < 0) {
+ throw new EOFException();
+ }
+ switch (type) {
+ case DataObjectOutputStream.TYPE_FAT_DESCRIPTOR:
+ return super.readClassDescriptor();
+ case DataObjectOutputStream.TYPE_THIN_DESCRIPTOR:
+ String className = readUTF();
+ Class<?> clazz = loadClass(className);
+ return ObjectStreamClass.lookup(clazz);
+ default:
+ className = DataObjectOutputStream.typeMapping.get((byte)type);
+ if (className == null) {
+ throw new StreamCorruptedException("Unknown class type " + type); //$NON-NLS-1$
+ }
+ clazz = loadClass(className);
+ return ObjectStreamClass.lookup(clazz);
+ }
+ }
+
+ @Override
+ protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
+ String className = desc.getName();
+ try {
+ return loadClass(className);
+ } catch (ClassNotFoundException ex) {
+ return super.resolveClass(desc);
+ }
+ }
+
+ protected Class<?> loadClass(String className) throws ClassNotFoundException {
+ Class<?> clazz;
+ if (classLoader != null) {
+ clazz = classLoader.loadClass(className);
+ } else {
+ clazz = Class.forName(className);
+ }
+ return clazz;
+ }
+ };
+ }
+ return ois.readObject();
+ }
+
+}
Property changes on: trunk/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectInputStream.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectOutputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectOutputStream.java (rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectOutputStream.java 2011-10-22 11:40:30 UTC (rev 3576)
@@ -0,0 +1,99 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
+import java.io.ObjectStreamClass;
+import java.io.OutputStream;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Extends the logic of Netty's CompactObjectOutputStream to use byte identifiers
+ * for some classes and to write/flush objects directly into the output.
+ *
+ * We can do this since buffer serialized data is ephemeral and good only for
+ * a single process.
+ */
+public class DataObjectOutputStream extends DataOutputStream implements ObjectOutput {
+
+ private static final int MAX_BYTE_IDS = 254;
+ static AtomicInteger counter = new AtomicInteger(2);
+ static final ConcurrentHashMap<String, Byte> knownClasses = new ConcurrentHashMap<String, Byte>();
+ static final ConcurrentHashMap<Byte, String> typeMapping = new ConcurrentHashMap<Byte, String>();
+
+ static final int TYPE_FAT_DESCRIPTOR = 0;
+ static final int TYPE_THIN_DESCRIPTOR = 1;
+
+ ObjectOutputStream oos;
+
+ public DataObjectOutputStream(OutputStream out) {
+ super(out);
+ }
+
+ @Override
+ public void writeObject(Object obj) throws IOException {
+ if (oos == null) {
+ oos = new ObjectOutputStream(this) {
+ @Override
+ protected void writeStreamHeader() throws IOException {
+ writeByte(STREAM_VERSION);
+ }
+
+ @Override
+ protected void writeClassDescriptor(ObjectStreamClass desc) throws IOException {
+ Class<?> clazz = desc.forClass();
+ if (clazz.isPrimitive() || clazz.isArray()) {
+ write(TYPE_FAT_DESCRIPTOR);
+ super.writeClassDescriptor(desc);
+ } else {
+ String name = desc.getName();
+ Byte b = knownClasses.get(name);
+ if (b == null && counter.get() < MAX_BYTE_IDS) {
+ synchronized (DataObjectOutputStream.class) {
+ b = knownClasses.get(name);
+ if (b == null && counter.get() < 254) {
+ b = (byte)counter.getAndIncrement();
+ knownClasses.put(name, b);
+ typeMapping.put(b, name);
+ }
+ }
+ }
+ if (b != null) {
+ write(b);
+ } else {
+ write(TYPE_THIN_DESCRIPTOR);
+ writeUTF(name);
+ }
+ }
+ }
+ };
+ }
+ oos.writeObject(obj);
+ oos.flush();
+ }
+
+}
Property changes on: trunk/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectOutputStream.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java 2011-10-21 20:18:54 UTC (rev 3575)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java 2011-10-22 11:40:30 UTC (rev 3576)
@@ -37,17 +37,18 @@
public void write(int b) throws IOException {
ensureBuffer();
- if (buf.remaining() == 0) {
- flush();
- }
buf.put((byte)b);
}
- private void ensureBuffer() {
- if (buf == null) {
- buf = newBuffer();
- startPosition = buf.position();
- }
+ private void ensureBuffer() throws IOException {
+ if (buf != null) {
+ if (buf.remaining() != 0) {
+ return;
+ }
+ flush();
+ }
+ buf = newBuffer();
+ startPosition = buf.position();
}
public void write(byte b[], int off, int len) throws IOException {
@@ -60,7 +61,6 @@
if (buf.remaining() > 0) {
break;
}
- flush();
}
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java 2011-10-21 20:18:54 UTC (rev 3575)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java 2011-10-22 11:40:30 UTC (rev 3576)
@@ -34,7 +34,6 @@
/**
* A Concurrent LRFU eviction queue. Has assumptions that match buffermanager usage.
* Null values are not allowed.
- * @param <K>
* @param <V>
*/
public class LrfuEvictionQueue<V extends BaseCacheEntry> {
@@ -49,11 +48,13 @@
//TODO: adaptively adjust this value. more hits should move closer to lru
protected double crfLamda;
protected double inverseCrfLamda = 1 - crfLamda;
- protected int maxInterval;
+ protected int maxInterval; //don't consider the old ordering value after the maxInterval
+ protected int minInterval; //cap the frequency gain under this interval (we can make some values too hot otherwise)
+ private float minVal;
public LrfuEvictionQueue(AtomicLong clock) {
this.clock = clock;
- setCrfLamda(.0002);
+ setCrfLamda(.00005); //smaller values tend to work better since we're using interval bounds
}
public boolean remove(V value) {
@@ -110,7 +111,7 @@
long delta = currentTime - longLastAccess;
orderingValue =
(float) (//Frequency component
- (delta>maxInterval?0:orderingValue*Math.pow(inverseCrfLamda, delta))
+ (delta<maxInterval?(delta<minInterval?minVal:Math.pow(inverseCrfLamda, delta)):0)*orderingValue
//recency component
+ Math.pow(currentTime, crfLamda));
return orderingValue;
@@ -125,9 +126,14 @@
this.inverseCrfLamda = 1 - crfLamda;
int i = 0;
for (; i < 30; i++) {
- if ((float)Math.pow(inverseCrfLamda, 1<<i) == 0) {
+ float val = (float)Math.pow(inverseCrfLamda, 1<<i);
+ if (val == 0) {
break;
}
+ if (val > .8) {
+ minInterval = 1<<i;
+ this.minVal = val;
+ }
}
this.maxInterval = 1<<(i-1);
}
Modified: trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java 2011-10-21 20:18:54 UTC (rev 3575)
+++ trunk/engine/src/main/java/org/teiid/query/processor/relational/SortUtility.java 2011-10-22 11:40:30 UTC (rev 3576)
@@ -298,7 +298,7 @@
TupleBuffer merged = createTupleBuffer();
int desiredSpace = activeTupleBuffers.size() * schemaSize;
- int reserved = Math.min(desiredSpace, this.bufferManager.getMaxProcessingKB());
+ int reserved = Math.min(desiredSpace, Math.max(2*schemaSize, this.bufferManager.getMaxProcessingKB()));
bufferManager.reserveBuffers(reserved, BufferReserveMode.FORCE);
if (desiredSpace > reserved) {
reserved += bufferManager.reserveAdditionalBuffers(desiredSpace - reserved);
Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java 2011-10-21 20:18:54 UTC (rev 3575)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java 2011-10-22 11:40:30 UTC (rev 3576)
@@ -25,8 +25,8 @@
import static org.junit.Assert.*;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
import java.lang.ref.WeakReference;
import org.junit.Test;
@@ -38,7 +38,7 @@
private final class SimpleSerializer implements Serializer<Integer> {
@Override
- public Integer deserialize(ObjectInputStream ois)
+ public Integer deserialize(ObjectInput ois)
throws IOException, ClassNotFoundException {
Integer result = ois.readInt();
for (int i = 0; i < result; i++) {
@@ -53,7 +53,7 @@
}
@Override
- public void serialize(Integer obj, ObjectOutputStream oos)
+ public void serialize(Integer obj, ObjectOutput oos)
throws IOException {
oos.writeInt(obj);
for (int i = 0; i < obj; i++) {
Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java 2011-10-21 20:18:54 UTC (rev 3575)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestConcurrentBitSet.java 2011-10-22 11:40:30 UTC (rev 3576)
@@ -81,4 +81,34 @@
assertEquals(50, bst.getAndSetNextClearBit());
}
+ @Test public void testCompactHighest() {
+ ConcurrentBitSet bst = new ConcurrentBitSet(1 << 19, 1);
+ bst.setCompact(true);
+ for (int i = 0; i < bst.getTotalBits(); i++) {
+ bst.getAndSetNextClearBit();
+ }
+ assertEquals(bst.getTotalBits()-1, bst.getHighestBitSet(0));
+ assertEquals(bst.getTotalBits()-1, bst.getHighestBitSet(1));
+
+ for (int i = bst.getTotalBits()-20; i < bst.getTotalBits(); i++) {
+ bst.clear(i);
+ }
+
+ assertEquals(bst.getTotalBits()-21, bst.compactHighestBitSet(0));
+
+ for (int i = bst.getTotalBits()-20; i < bst.getTotalBits(); i++) {
+ bst.getAndSetNextClearBit();
+ }
+
+ assertEquals(-1, bst.getAndSetNextClearBit());
+
+ for (int i = 20; i < bst.getTotalBits(); i++) {
+ bst.clear(i);
+ }
+
+ assertEquals(bst.getTotalBits()-1, bst.getHighestBitSet(0));
+ assertEquals(19, bst.compactHighestBitSet(0));
+
+ }
+
}
Modified: trunk/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java 2011-10-21 20:18:54 UTC (rev 3575)
+++ trunk/runtime/src/main/java/org/teiid/transport/PgBackendProtocol.java 2011-10-22 11:40:30 UTC (rev 3576)
@@ -113,19 +113,20 @@
while (true) {
try {
nextFuture = rs.submitNext();
- if (!nextFuture.isDone()) {
- nextFuture.addCompletionListener(new ResultsFuture.CompletionListener<Boolean>() {
- @Override
- public void onCompletion(ResultsFuture<Boolean> future) {
- if (processRow(future)) {
- if (rowsSent != rows2Send) {
- //this can be recursive, but ideally won't be called many times
- ResultsWorkItem.this.run();
- }
- }
- }
- });
- return;
+ synchronized (nextFuture) {
+ if (!nextFuture.isDone()) {
+ nextFuture.addCompletionListener(new ResultsFuture.CompletionListener<Boolean>() {
+ @Override
+ public void onCompletion(ResultsFuture<Boolean> future) {
+ if (processRow(future)) {
+ if (rowsSent != rows2Send) {
+ ResultsWorkItem.this.run();
+ }
+ }
+ }
+ });
+ return;
+ }
}
if (!processRow(nextFuture)) {
break;
13 years, 2 months
teiid SVN: r3575 - in trunk/engine/src: test/java/org/teiid/query/parser and 1 other directory.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2011-10-21 16:18:54 -0400 (Fri, 21 Oct 2011)
New Revision: 3575
Modified:
trunk/engine/src/main/javacc/org/teiid/query/parser/SQLParser.jj
trunk/engine/src/test/java/org/teiid/query/parser/TestParser.java
Log:
TEIID-1790 fix for substring parsing
Modified: trunk/engine/src/main/javacc/org/teiid/query/parser/SQLParser.jj
===================================================================
--- trunk/engine/src/main/javacc/org/teiid/query/parser/SQLParser.jj 2011-10-20 17:09:57 UTC (rev 3574)
+++ trunk/engine/src/main/javacc/org/teiid/query/parser/SQLParser.jj 2011-10-21 20:18:54 UTC (rev 3575)
@@ -3588,6 +3588,7 @@
Expression expression = null;
ArrayList args = new ArrayList(2);
+ ArrayList otherArgs = null;
Token funcToken = null;
}
{
@@ -3620,14 +3621,14 @@
<RPAREN>
)
|
- LOOKAHEAD(4, {getToken(1).image.equalsIgnoreCase("SUBSTRING")}) (
+ LOOKAHEAD(2, {getToken(1).image.equalsIgnoreCase("SUBSTRING")}) (
funcName = nonReserved("SUBSTRING")
<LPAREN>
expression = expression(info)
{
args.add(expression);
}
- <FROM> expression = expression(info)
+ ((<FROM> expression = expression(info)
{
args.add(expression);
}
@@ -3635,7 +3636,7 @@
{
args.add(expression);
}
- ]
+ ])|(<COMMA> otherArgs = expressionList(info)) {args.addAll(otherArgs);})
<RPAREN>
)
|
Modified: trunk/engine/src/test/java/org/teiid/query/parser/TestParser.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/parser/TestParser.java 2011-10-20 17:09:57 UTC (rev 3574)
+++ trunk/engine/src/test/java/org/teiid/query/parser/TestParser.java 2011-10-21 20:18:54 UTC (rev 3575)
@@ -6862,5 +6862,11 @@
@Test public void testTrim1() {
helpException("select trim('xy' from e1) from pm1.g1");
}
+
+ @Test public void testSubString() throws QueryParserException {
+ Query actualCommand = (Query)QueryParser.getQueryParser().parseCommand("SELECT substring(RTRIM(MED.BATDAT), 4, 4) FROM FCC.MEDMAS AS MED", new ParseInfo());
+ String actualString = actualCommand.toString();
+ assertEquals("SELECT substring(RTRIM(MED.BATDAT), 4, 4) FROM FCC.MEDMAS AS MED", actualString);
+ }
}
13 years, 2 months
teiid SVN: r3574 - in branches/as7: admin/src/main/java/org/teiid/adminapi and 7 other directories.
by teiid-commits@lists.jboss.org
Author: rareddy
Date: 2011-10-20 13:09:57 -0400 (Thu, 20 Oct 2011)
New Revision: 3574
Modified:
branches/as7/admin/src/main/java/org/teiid/adminapi/Admin.java
branches/as7/admin/src/main/java/org/teiid/adminapi/AdminFactory.java
branches/as7/admin/src/main/java/org/teiid/adminapi/impl/VDBMetaData.java
branches/as7/adminshell/src/main/java/org/teiid/adminshell/AdminShell.java
branches/as7/api/src/main/java/org/teiid/translator/ExecutionFactory.java
branches/as7/connectors/translator-jdbc/src/main/java/org/teiid/translator/jdbc/JDBCExecutionFactory.java
branches/as7/jboss-integration/src/main/java/org/teiid/jboss/TeiidAdd.java
branches/as7/jboss-integration/src/main/java/org/teiid/jboss/TeiidServiceNames.java
branches/as7/jboss-integration/src/main/java/org/teiid/jboss/TranslatorAdd.java
branches/as7/jboss-integration/src/main/java/org/teiid/jboss/TranslatorService.java
branches/as7/jboss-integration/src/main/java/org/teiid/jboss/TransportAdd.java
branches/as7/jboss-integration/src/main/java/org/teiid/jboss/VDBDeployer.java
branches/as7/jboss-integration/src/main/java/org/teiid/jboss/VDBService.java
branches/as7/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties
branches/as7/pom.xml
branches/as7/runtime/src/main/java/org/teiid/deployers/VDBStatusChecker.java
Log:
TEIID-1720: corrected VDB dependencies on the data sources and admin api corrections
Modified: branches/as7/admin/src/main/java/org/teiid/adminapi/Admin.java
===================================================================
--- branches/as7/admin/src/main/java/org/teiid/adminapi/Admin.java 2011-10-20 15:37:18 UTC (rev 3573)
+++ branches/as7/admin/src/main/java/org/teiid/adminapi/Admin.java 2011-10-20 17:09:57 UTC (rev 3574)
@@ -287,11 +287,4 @@
* @throws AdminException
*/
Set<String> getDataSourceTemplateNames() throws AdminException;
-
- /**
- * Tell the engine that the given source is available. Pending dynamic vdb metadata loads will be resumed.
- * @param jndiName
- * @throws AdminException
- */
- void markDataSourceAvailable(String jndiName) throws AdminException;
}
Modified: branches/as7/admin/src/main/java/org/teiid/adminapi/AdminFactory.java
===================================================================
--- branches/as7/admin/src/main/java/org/teiid/adminapi/AdminFactory.java 2011-10-20 15:37:18 UTC (rev 3573)
+++ branches/as7/admin/src/main/java/org/teiid/adminapi/AdminFactory.java 2011-10-20 17:09:57 UTC (rev 3574)
@@ -27,6 +27,8 @@
import java.io.IOException;
import java.io.InputStream;
+import java.math.BigDecimal;
+import java.math.BigInteger;
import java.net.UnknownHostException;
import java.util.*;
@@ -41,6 +43,8 @@
import org.jboss.as.controller.client.ModelControllerClient;
import org.jboss.as.protocol.old.StreamUtils;
import org.jboss.dmr.ModelNode;
+import org.jboss.dmr.ModelType;
+import org.teiid.adminapi.PropertyDefinition.RestartType;
import org.teiid.adminapi.VDB.ConnectionType;
import org.teiid.adminapi.impl.*;
import org.teiid.adminapi.impl.VDBMetadataMapper.RequestMetadataMapper;
@@ -187,23 +191,10 @@
}
}
- private void createConnectionFactoryRequest(String deploymentName, String templateName, Properties properties) throws AdminException {
- DefaultOperationRequestBuilder builder = new DefaultOperationRequestBuilder();
- try {
- builder.operationName("add");
- builder.addNode("subsystem", "resource-adapters"); //$NON-NLS-1$ //$NON-NLS-2$
- builder.addNode("resource-adapter", templateName); //$NON-NLS-1$
-
- builder.addProperty("archive", templateName);
- builder.addProperty("transaction-support", properties.getProperty("transaction-support", "NoTransaction"));
- properties.remove("transaction-support");
-
-
- } catch (OperationFormatException e) {
- throw new IllegalStateException("Failed to build operation", e); //$NON-NLS-1$
- }
-
-
+ private void createConnectionFactoryRequest(String deploymentName, String templateName, Properties properties, DefaultOperationRequestBuilder builder) throws AdminException {
+ builder.addProperty("archive", templateName);
+ builder.addProperty("transaction-support", properties.getProperty("transaction-support", "NoTransaction"));
+ properties.remove("transaction-support");
}
@Override
@@ -223,9 +214,10 @@
else if (templateName.equals("resource-adapters")) {
builder.addNode("subsystem", "resource-adapters"); //$NON-NLS-1$ //$NON-NLS-2$
builder.addNode("resource-adapter", deploymentName); //$NON-NLS-1$
+ createConnectionFactoryRequest(deploymentName, templateName, properties, builder);
}
- builder.operationName("add");
+ builder.setOperationName("add");
request = builder.buildRequest();
builder.addProperty("jndi-name", "java:/"+deploymentName);
@@ -433,7 +425,7 @@
try {
builder.addNode("subsystem", "resource-adapters"); //$NON-NLS-1$ //$NON-NLS-2$
builder.addNode("resource-adapter", resource); //$NON-NLS-1$ //$NON-NLS-2$
- builder.operationName("read-resource");
+ builder.setOperationName("read-resource");
ModelNode request = builder.buildRequest();
ModelNode outcome = this.connection.execute(request);
@@ -490,7 +482,7 @@
@Override
public WorkerPoolStatistics getWorkerPoolStats() throws AdminException {
- final ModelNode request = buildEngineRequest("workerpool-statistics");//$NON-NLS-1$
+ final ModelNode request = buildRequest("teiid", "workerpool-statistics");//$NON-NLS-1$
if (request != null) {
try {
ModelNode outcome = this.connection.execute(request);
@@ -510,7 +502,7 @@
@Override
public void cancelRequest(String sessionId, long executionId) throws AdminException {
- final ModelNode request = buildEngineRequest("terminate-session", "session", sessionId, "execution-id", String.valueOf(executionId));//$NON-NLS-1$
+ final ModelNode request = buildRequest("teiid", "terminate-session", "session", sessionId, "execution-id", String.valueOf(executionId));//$NON-NLS-1$
if (request == null) {
return;
}
@@ -526,7 +518,7 @@
@Override
public Collection<? extends Request> getRequests() throws AdminException {
- final ModelNode request = buildEngineRequest("list-requests");//$NON-NLS-1$
+ final ModelNode request = buildRequest("teiid", "list-requests");//$NON-NLS-1$
if (request != null) {
try {
ModelNode outcome = this.connection.execute(request);
@@ -542,7 +534,7 @@
@Override
public Collection<? extends Request> getRequestsForSession(String sessionId) throws AdminException {
- final ModelNode request = buildEngineRequest("requests-per-session", "session", sessionId);//$NON-NLS-1$
+ final ModelNode request = buildRequest("teiid", "requests-per-session", "session", sessionId);//$NON-NLS-1$
if (request != null) {
try {
ModelNode outcome = this.connection.execute(request);
@@ -558,7 +550,7 @@
@Override
public Collection<? extends Session> getSessions() throws AdminException {
- final ModelNode request = buildEngineRequest("list-sessions");//$NON-NLS-1$
+ final ModelNode request = buildRequest("teiid", "list-sessions");//$NON-NLS-1$
if (request != null) {
try {
ModelNode outcome = this.connection.execute(request);
@@ -572,15 +564,133 @@
return Collections.emptyList();
}
- @Override
+ /**
+ * pattern on CLI
+ * /subsystem=datasources/data-source=foo:read-resource-description
+ */
+ @Override
public Collection<PropertyDefinition> getTemplatePropertyDefinitions(String templateName) throws AdminException {
- // rameshTODO Auto-generated method stub
- return null;
+ DefaultOperationRequestBuilder builder = new DefaultOperationRequestBuilder();
+ ModelNode request = null;
+ try {
+ // data-source,xa-data-source,resource-adapters
+ if (templateName.equals("data-source")) {
+ builder.addNode("subsystem", "datasources"); //$NON-NLS-1$ //$NON-NLS-2$
+ builder.addNode("data-source", "any"); //$NON-NLS-1$
+ }
+ else if (templateName.equals("xa-data-source")) {
+ builder.addNode("subsystem", "datasources"); //$NON-NLS-1$ //$NON-NLS-2$
+ builder.addNode("xa-data-source", "any"); //$NON-NLS-1$
+ }
+ else {
+ builder.addNode("subsystem", "resource-adapters"); //$NON-NLS-1$ //$NON-NLS-2$
+ builder.addNode("resource-adapter", templateName); //$NON-NLS-1$
+ }
+
+ builder.setOperationName("read-resource-description");
+ request = builder.buildRequest();
+ } catch (OperationFormatException e) {
+ throw new IllegalStateException("Failed to build operation", e); //$NON-NLS-1$
+ }
+
+ ModelNode result = null;
+ try {
+ ModelNode outcome = this.connection.execute(request);
+ if (!Util.isSuccess(outcome)) {
+ throw new AdminProcessingException(Util.getFailureDescription(outcome));
+ }
+ result = outcome.get("result");
+ } catch (IOException e) {
+ throw new AdminProcessingException(e);
+ }
+
+ ArrayList<PropertyDefinition> propDefinitions = new ArrayList<PropertyDefinition>();
+ List<ModelNode> propsNodes = null;
+ if (templateName.equals("data-source") || templateName.equals("xa-data-source")) {
+ propsNodes = result.get("attributes").asList();
+ }
+ else {
+ propsNodes = result.get("connection-definitions", "attributes").asList();
+ }
+
+ for (ModelNode node:propsNodes) {
+ PropertyDefinitionMetadata def = new PropertyDefinitionMetadata();
+ Set<String> keys = node.keys();
+
+ String name = keys.iterator().next();
+ def.setName(name);
+ node = node.get(name);
+
+ if (node.hasDefined("description")) {
+ def.setDescription(node.get("description").asString());
+ }
+
+ if (node.hasDefined("required")) {
+ def.setRequired(node.get("required").asBoolean());
+ }
+
+ if (node.hasDefined("access-type")) {
+ String access = node.get("access-type").asString();
+ if ("read-only".equals(access)) {
+ def.setModifiable(false);
+ }
+ else if ("read-write".equals(access)) {
+ def.setModifiable(true);
+ }
+ }
+
+ if (node.hasDefined("restart-required")) {
+ def.setRequiresRestart(RestartType.CLUSTER);
+ }
+
+ String type = node.get("type").asString();
+ if (ModelType.STRING.name().equals(type)) {
+ def.setPropertyTypeClassName(String.class.getName());
+ }
+ else if (ModelType.INT.name().equals(type)) {
+ def.setPropertyTypeClassName(Integer.class.getName());
+ }
+ else if (ModelType.LONG.name().equals(type)) {
+ def.setPropertyTypeClassName(Long.class.getName());
+ }
+ else if (ModelType.BOOLEAN.name().equals(type)) {
+ def.setPropertyTypeClassName(Boolean.class.getName());
+ }
+ else if (ModelType.BIG_INTEGER.name().equals(type)) {
+ def.setPropertyTypeClassName(BigInteger.class.getName());
+ }
+ else if (ModelType.BIG_DECIMAL.name().equals(type)) {
+ def.setPropertyTypeClassName(BigDecimal.class.getName());
+ }
+
+ if (node.hasDefined("default")) {
+ if (ModelType.STRING.name().equals(type)) {
+ def.setDefaultValue(node.get("default").asString());
+ }
+ else if (ModelType.INT.name().equals(type)) {
+ def.setDefaultValue(node.get("default").asInt());
+ }
+ else if (ModelType.LONG.name().equals(type)) {
+ def.setDefaultValue(node.get("default").asLong());
+ }
+ else if (ModelType.BOOLEAN.name().equals(type)) {
+ def.setDefaultValue(node.get("default").asBoolean());
+ }
+ else if (ModelType.BIG_INTEGER.name().equals(type)) {
+ def.setDefaultValue(node.get("default").asBigInteger());
+ }
+ else if (ModelType.BIG_DECIMAL.name().equals(type)) {
+ def.setDefaultValue(node.get("default").asBigDecimal());
+ }
+ }
+ propDefinitions.add(def);
+ }
+ return propDefinitions;
}
@Override
public Collection<? extends Transaction> getTransactions() throws AdminException {
- final ModelNode request = buildEngineRequest("list-transactions");//$NON-NLS-1$
+ final ModelNode request = buildRequest("teiid", "list-transactions");//$NON-NLS-1$
if (request != null) {
try {
ModelNode outcome = this.connection.execute(request);
@@ -596,7 +706,7 @@
@Override
public void terminateSession(String sessionId) throws AdminException {
- final ModelNode request = buildEngineRequest("terminate-session", "session", sessionId);//$NON-NLS-1$
+ final ModelNode request = buildRequest("teiid", "terminate-session", "session", sessionId);//$NON-NLS-1$
if (request == null) {
return;
}
@@ -612,7 +722,7 @@
@Override
public void terminateTransaction(String transactionId) throws AdminException {
- final ModelNode request = buildEngineRequest("terminate-transaction", "xid", transactionId);//$NON-NLS-1$
+ final ModelNode request = buildRequest("teiid", "terminate-transaction", "xid", transactionId);//$NON-NLS-1$
if (request == null) {
return;
}
@@ -662,20 +772,20 @@
return Collections.emptyList();
}
- private List<String> getEngines(ModelControllerClient client) {
+ public List<String> getTransports() {
DefaultOperationRequestBuilder builder = new DefaultOperationRequestBuilder();
final ModelNode request;
try {
builder.addNode("subsystem", "teiid"); //$NON-NLS-1$ //$NON-NLS-2$
- builder.operationName("read-children-names");
- builder.addProperty("child-type", "query-engine");
+ builder.setOperationName("read-children-names");
+ builder.addProperty("child-type", "transport");
request = builder.buildRequest();
} catch (OperationFormatException e) {
throw new IllegalStateException("Failed to build operation", e);
}
try {
- ModelNode outcome = client.execute(request);
+ ModelNode outcome = this.connection.execute(request);
if (Util.isSuccess(outcome)) {
return Util.getList(outcome);
}
@@ -685,41 +795,12 @@
return Collections.emptyList();
}
- private ModelNode buildEngineRequest(String operationName, String... params) {
- ModelNode composite = new ModelNode();
- composite.get("operation").set("composite");
- composite.get("address").setEmptyList();
- ModelNode steps = composite.get("steps");
-
- List<String> engines = getEngines(this.connection);
-
- for (String engine:engines) {
- DefaultOperationRequestBuilder builder = new DefaultOperationRequestBuilder();
- final ModelNode request;
- try {
- builder.addNode("subsystem", "teiid"); //$NON-NLS-1$ //$NON-NLS-2$
- builder.addNode("query-engine", engine); //$NON-NLS-1$ //$NON-NLS-2$
- builder.operationName(operationName);
- request = builder.buildRequest();
- if (params != null && params.length % 2 == 0) {
- for (int i = 0; i < params.length; i+=2) {
- builder.addProperty(params[i], params[i+1]);
- }
- }
- steps.add(request);
- } catch (OperationFormatException e) {
- throw new IllegalStateException("Failed to build operation", e); //$NON-NLS-1$
- }
- }
- return composite;
- }
-
private ModelNode buildRequest(String subsystem, String operationName, String... params) {
DefaultOperationRequestBuilder builder = new DefaultOperationRequestBuilder();
final ModelNode request;
try {
builder.addNode("subsystem", subsystem); //$NON-NLS-1$ //$NON-NLS-2$
- builder.operationName(operationName);
+ builder.setOperationName(operationName);
request = builder.buildRequest();
if (params != null && params.length % 2 == 0) {
for (int i = 0; i < params.length; i+=2) {
@@ -804,12 +885,6 @@
}
@Override
- public void markDataSourceAvailable(String jndiName) throws AdminException {
- // rameshTODO Auto-generated method stub
-
- }
-
- @Override
public void mergeVDBs(String sourceVDBName, int sourceVDBVersion,
String targetVDBName, int targetVDBVersion)
throws AdminException {
Modified: branches/as7/admin/src/main/java/org/teiid/adminapi/impl/VDBMetaData.java
===================================================================
--- branches/as7/admin/src/main/java/org/teiid/adminapi/impl/VDBMetaData.java 2011-10-20 15:37:18 UTC (rev 3573)
+++ branches/as7/admin/src/main/java/org/teiid/adminapi/impl/VDBMetaData.java 2011-10-20 17:09:57 UTC (rev 3574)
@@ -220,6 +220,10 @@
this.translators.getMap().put(t.getName(), t);
}
+ public boolean isOverideTranslator(String name) {
+ return this.translators.getMap().containsKey(name);
+ }
+
@Override
public String getDescription() {
return this.description;
Modified: branches/as7/adminshell/src/main/java/org/teiid/adminshell/AdminShell.java
===================================================================
--- branches/as7/adminshell/src/main/java/org/teiid/adminshell/AdminShell.java 2011-10-20 15:37:18 UTC (rev 3573)
+++ branches/as7/adminshell/src/main/java/org/teiid/adminshell/AdminShell.java 2011-10-20 17:09:57 UTC (rev 3574)
@@ -413,11 +413,4 @@
@Doc(text = "method name") String method) {
help.help(method);
}
-
- @Doc(text = "Tell the engine that the given source is available. Pending dynamic vdb metadata loads will be resumed.")
- public static void markDataSourceAvailable(
- @Doc(text = "jndi name") String name) throws AdminException {
- getAdmin().markDataSourceAvailable(name);
- }
-
}
Modified: branches/as7/api/src/main/java/org/teiid/translator/ExecutionFactory.java
===================================================================
--- branches/as7/api/src/main/java/org/teiid/translator/ExecutionFactory.java 2011-10-20 15:37:18 UTC (rev 3573)
+++ branches/as7/api/src/main/java/org/teiid/translator/ExecutionFactory.java 2011-10-20 17:09:57 UTC (rev 3574)
@@ -208,7 +208,15 @@
public void setSourceRequired(boolean value) {
this.sourceRequired = value;
- }
+ }
+
+ /**
+ * Flag to determine between if a underlying connection is a data source or connection-factory
+ * @return false
+ */
+ public boolean isJDBCSource() {
+ return false;
+ }
/**
* Obtain a reference to the default LanguageFactory that can be used to construct
Modified: branches/as7/connectors/translator-jdbc/src/main/java/org/teiid/translator/jdbc/JDBCExecutionFactory.java
===================================================================
--- branches/as7/connectors/translator-jdbc/src/main/java/org/teiid/translator/jdbc/JDBCExecutionFactory.java 2011-10-20 15:37:18 UTC (rev 3573)
+++ branches/as7/connectors/translator-jdbc/src/main/java/org/teiid/translator/jdbc/JDBCExecutionFactory.java 2011-10-20 17:09:57 UTC (rev 3574)
@@ -224,6 +224,11 @@
return true;
}
+ @Override
+ public boolean isJDBCSource() {
+ return true;
+ }
+
@Override
public ResultSetExecution createResultSetExecution(QueryExpression command, ExecutionContext executionContext, RuntimeMetadata metadata, Connection conn)
throws TranslatorException {
Modified: branches/as7/jboss-integration/src/main/java/org/teiid/jboss/TeiidAdd.java
===================================================================
--- branches/as7/jboss-integration/src/main/java/org/teiid/jboss/TeiidAdd.java 2011-10-20 15:37:18 UTC (rev 3573)
+++ branches/as7/jboss-integration/src/main/java/org/teiid/jboss/TeiidAdd.java 2011-10-20 17:09:57 UTC (rev 3574)
@@ -31,6 +31,7 @@
import java.util.Locale;
import java.util.ResourceBundle;
import java.util.ServiceLoader;
+import java.util.concurrent.Executor;
import javax.resource.spi.XATerminator;
import javax.resource.spi.work.WorkManager;
@@ -66,6 +67,7 @@
import org.teiid.common.buffer.BufferManager;
import org.teiid.deployers.SystemVDBDeployer;
import org.teiid.deployers.VDBRepository;
+import org.teiid.deployers.VDBStatusChecker;
import org.teiid.dqp.internal.datamgr.TranslatorRepository;
import org.teiid.dqp.internal.process.AuthorizationValidator;
import org.teiid.dqp.internal.process.CachedResults;
@@ -190,6 +192,18 @@
VDBRepositoryService vdbRepositoryService = new VDBRepositoryService(vdbRepository);
newControllers.add(target.addService(TeiidServiceNames.VDB_REPO, vdbRepositoryService).install());
+ // VDB Status manager
+ final VDBStatusChecker statusChecker = new VDBStatusChecker(vdbRepository);
+ ValueService<VDBStatusChecker> statusService = new ValueService<VDBStatusChecker>(new org.jboss.msc.value.Value<VDBStatusChecker>() {
+ @Override
+ public VDBStatusChecker getValue() throws IllegalStateException, IllegalArgumentException {
+ return statusChecker;
+ }
+ });
+ ServiceBuilder<VDBStatusChecker> statusBuilder = target.addService(TeiidServiceNames.VDB_STATUS_CHECKER, statusService);
+ statusBuilder.addDependency(TeiidServiceNames.executorServiceName(asyncThreadPoolName), Executor.class, statusChecker.getExecutorInjector());
+ newControllers.add(statusBuilder.install());
+
// System VDB Service
SystemVDBDeployer systemVDB = new SystemVDBDeployer();
systemVDB.setVDBRepository(vdbRepository);
@@ -307,7 +321,7 @@
processorTarget.addDeploymentProcessor(Phase.STRUCTURE, Phase.STRUCTURE_WAR_DEPLOYMENT_INIT|0x0001,new VDBStructureDeployer());
processorTarget.addDeploymentProcessor(Phase.PARSE, Phase.PARSE_WEB_DEPLOYMENT|0x0001, new VDBParserDeployer());
processorTarget.addDeploymentProcessor(Phase.DEPENDENCIES, Phase.DEPENDENCIES_WAR_MODULE|0x0001, new VDBDependencyDeployer());
- processorTarget.addDeploymentProcessor(Phase.INSTALL, Phase.INSTALL_WAR_DEPLOYMENT|0x0001, new VDBDeployer(translatorRepo, asyncThreadPoolName));
+ processorTarget.addDeploymentProcessor(Phase.INSTALL, Phase.INSTALL_WAR_DEPLOYMENT|0x0001, new VDBDeployer(translatorRepo, asyncThreadPoolName, statusChecker));
}
}, OperationContext.Stage.RUNTIME);
Modified: branches/as7/jboss-integration/src/main/java/org/teiid/jboss/TeiidServiceNames.java
===================================================================
--- branches/as7/jboss-integration/src/main/java/org/teiid/jboss/TeiidServiceNames.java 2011-10-20 15:37:18 UTC (rev 3573)
+++ branches/as7/jboss-integration/src/main/java/org/teiid/jboss/TeiidServiceNames.java 2011-10-20 17:09:57 UTC (rev 3574)
@@ -39,8 +39,9 @@
public static ServiceName CACHE_RESULTSET = ServiceName.JBOSS.append("teiid", "cache", "resultset"); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
public static ServiceName CACHE_PREPAREDPLAN = ServiceName.JBOSS.append("teiid", "cache", "prepared-plan"); //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
public static ServiceName OBJECT_REPLICATOR = ServiceName.JBOSS.append("teiid", "object-replicator"); //$NON-NLS-1$ //$NON-NLS-2$
+ public static ServiceName VDB_STATUS_CHECKER = ServiceName.JBOSS.append("teiid", "vdb-status-checker"); //$NON-NLS-1$ //$NON-NLS-2$
+ public static ServiceName DS_LISTENER_BASE = ServiceName.JBOSS.append("teiid", "ds-listener"); //$NON-NLS-1$ //$NON-NLS-2$
-
public static ServiceName translatorServiceName(String name) {
return ServiceName.of(TRANSLATOR_BASE, name);
}
@@ -56,4 +57,8 @@
public static ServiceName transportServiceName(String name) {
return ServiceName.of(TRANSPORT_BASE, name);
}
+
+ public static ServiceName dsListenerServiceName(String name) {
+ return ServiceName.of(DS_LISTENER_BASE, name);
+ }
}
Modified: branches/as7/jboss-integration/src/main/java/org/teiid/jboss/TranslatorAdd.java
===================================================================
--- branches/as7/jboss-integration/src/main/java/org/teiid/jboss/TranslatorAdd.java 2011-10-20 15:37:18 UTC (rev 3573)
+++ branches/as7/jboss-integration/src/main/java/org/teiid/jboss/TranslatorAdd.java 2011-10-20 17:09:57 UTC (rev 3574)
@@ -47,6 +47,7 @@
import org.jboss.msc.service.ServiceTarget;
import org.teiid.adminapi.impl.VDBTranslatorMetaData;
import org.teiid.deployers.TranslatorUtil;
+import org.teiid.deployers.VDBStatusChecker;
import org.teiid.dqp.internal.datamgr.TranslatorRepository;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
@@ -116,6 +117,7 @@
TranslatorService translatorService = new TranslatorService(metadata);
ServiceBuilder<VDBTranslatorMetaData> builder = target.addService(TeiidServiceNames.translatorServiceName(metadata.getName()), translatorService);
builder.addDependency(TeiidServiceNames.TRANSLATOR_REPO, TranslatorRepository.class, translatorService.repositoryInjector);
+ builder.addDependency(TeiidServiceNames.VDB_STATUS_CHECKER, VDBStatusChecker.class, translatorService.statusCheckerInjector);
newControllers.add(builder.setInitialMode(ServiceController.Mode.ACTIVE).install());
added = true;
}
Modified: branches/as7/jboss-integration/src/main/java/org/teiid/jboss/TranslatorService.java
===================================================================
--- branches/as7/jboss-integration/src/main/java/org/teiid/jboss/TranslatorService.java 2011-10-20 15:37:18 UTC (rev 3573)
+++ branches/as7/jboss-integration/src/main/java/org/teiid/jboss/TranslatorService.java 2011-10-20 17:09:57 UTC (rev 3574)
@@ -28,13 +28,15 @@
import org.jboss.msc.service.StopContext;
import org.jboss.msc.value.InjectedValue;
import org.teiid.adminapi.impl.VDBTranslatorMetaData;
+import org.teiid.deployers.VDBStatusChecker;
import org.teiid.dqp.internal.datamgr.TranslatorRepository;
class TranslatorService implements Service<VDBTranslatorMetaData> {
private VDBTranslatorMetaData translator;
final InjectedValue<TranslatorRepository> repositoryInjector = new InjectedValue<TranslatorRepository>();
-
+ final InjectedValue<VDBStatusChecker> statusCheckerInjector = new InjectedValue<VDBStatusChecker>();
+
public TranslatorService(VDBTranslatorMetaData translator) {
this.translator = translator;
}
@@ -42,11 +44,13 @@
@Override
public void start(StartContext context) throws StartException {
this.repositoryInjector.getValue().addTranslatorMetadata(this.translator.getName(), this.translator);
+ this.statusCheckerInjector.getValue().translatorAdded(this.translator.getName());
}
@Override
public void stop(StopContext context) {
this.repositoryInjector.getValue().removeTranslatorMetadata(this.translator.getName());
+ this.statusCheckerInjector.getValue().translatorRemoved(this.translator.getName());
}
@Override
Modified: branches/as7/jboss-integration/src/main/java/org/teiid/jboss/TransportAdd.java
===================================================================
--- branches/as7/jboss-integration/src/main/java/org/teiid/jboss/TransportAdd.java 2011-10-20 15:37:18 UTC (rev 3573)
+++ branches/as7/jboss-integration/src/main/java/org/teiid/jboss/TransportAdd.java 2011-10-20 17:09:57 UTC (rev 3574)
@@ -43,7 +43,7 @@
import org.jboss.as.controller.ServiceVerificationHandler;
import org.jboss.as.controller.descriptions.DescriptionProvider;
import org.jboss.as.naming.ManagedReferenceFactory;
-import org.jboss.as.naming.NamingStore;
+import org.jboss.as.naming.ServiceBasedNamingStore;
import org.jboss.as.naming.deployment.ContextNames;
import org.jboss.as.naming.service.BinderService;
import org.jboss.as.network.SocketBinding;
@@ -200,7 +200,7 @@
final BinderService embedded = new BinderService(bindInfo.getBindName());
final ServiceBuilder<?> embeddedBinderBuilder = target.addService(bindInfo.getBinderServiceName(), embedded);
embeddedBinderBuilder.addDependency(referenceFactoryServiceName, ManagedReferenceFactory.class, embedded.getManagedObjectInjector());
- embeddedBinderBuilder.addDependency(bindInfo.getParentContextServiceName(), NamingStore.class, embedded.getNamingStoreInjector());
+ embeddedBinderBuilder.addDependency(bindInfo.getParentContextServiceName(), ServiceBasedNamingStore.class, embedded.getNamingStoreInjector());
embeddedBinderBuilder.setInitialMode(ServiceController.Mode.ACTIVE);
newControllers.add(referenceBuilder.install());
Modified: branches/as7/jboss-integration/src/main/java/org/teiid/jboss/VDBDeployer.java
===================================================================
--- branches/as7/jboss-integration/src/main/java/org/teiid/jboss/VDBDeployer.java 2011-10-20 15:37:18 UTC (rev 3573)
+++ branches/as7/jboss-integration/src/main/java/org/teiid/jboss/VDBDeployer.java 2011-10-20 17:09:57 UTC (rev 3574)
@@ -22,6 +22,8 @@
package org.teiid.jboss;
import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
import java.util.List;
import java.util.concurrent.Executor;
@@ -33,10 +35,11 @@
import org.jboss.as.server.deployment.DeploymentUnitProcessingException;
import org.jboss.as.server.deployment.DeploymentUnitProcessor;
import org.jboss.modules.Module;
-import org.jboss.msc.service.ServiceBuilder;
+import org.jboss.msc.service.*;
import org.jboss.msc.service.ServiceBuilder.DependencyType;
-import org.jboss.msc.service.ServiceController;
import org.jboss.msc.service.ServiceController.Mode;
+import org.jboss.msc.service.ServiceController.State;
+import org.teiid.adminapi.Model;
import org.teiid.adminapi.Translator;
import org.teiid.adminapi.impl.ModelMetaData;
import org.teiid.adminapi.impl.VDBMetaData;
@@ -44,23 +47,28 @@
import org.teiid.deployers.TeiidAttachments;
import org.teiid.deployers.UDFMetaData;
import org.teiid.deployers.VDBRepository;
+import org.teiid.deployers.VDBStatusChecker;
import org.teiid.dqp.internal.datamgr.TranslatorRepository;
+import org.teiid.jboss.VDBService.TranslatorNotFoundException;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
import org.teiid.metadata.index.IndexMetadataFactory;
import org.teiid.query.ObjectReplicator;
import org.teiid.runtime.RuntimePlugin;
import org.teiid.services.BufferServiceImpl;
+import org.teiid.translator.ExecutionFactory;
class VDBDeployer implements DeploymentUnitProcessor {
private static final String JAVA_CONTEXT = "java:/"; //$NON-NLS-1$
private TranslatorRepository translatorRepository;
private String asyncThreadPoolName;
+ private VDBStatusChecker vdbStatusChecker;
- public VDBDeployer (TranslatorRepository translatorRepo, String poolName) {
+ public VDBDeployer (TranslatorRepository translatorRepo, String poolName, VDBStatusChecker vdbStatusChecker) {
this.translatorRepository = translatorRepo;
this.asyncThreadPoolName = poolName;
+ this.vdbStatusChecker = vdbStatusChecker;
}
public void deploy(final DeploymentPhaseContext context) throws DeploymentUnitProcessingException {
@@ -69,7 +77,7 @@
return;
}
final String deploymentName = deploymentUnit.getName();
- VDBMetaData deployment = deploymentUnit.getAttachment(TeiidAttachments.VDB_METADATA);
+ final VDBMetaData deployment = deploymentUnit.getAttachment(TeiidAttachments.VDB_METADATA);
// check to see if there is old vdb already deployed.
final ServiceController<?> controller = context.getServiceRegistry().getService(TeiidServiceNames.vdbServiceName(deployment.getName(), deployment.getVersion()));
@@ -119,18 +127,38 @@
// build a VDB service
ArrayList<String> unAvailableDS = new ArrayList<String>();
VDBService vdb = new VDBService(deployment);
- ServiceBuilder<VDBMetaData> vdbService = context.getServiceTarget().addService(TeiidServiceNames.vdbServiceName(deployment.getName(), deployment.getVersion()), vdb);
+ final ServiceBuilder<VDBMetaData> vdbService = context.getServiceTarget().addService(TeiidServiceNames.vdbServiceName(deployment.getName(), deployment.getVersion()), vdb);
for (ModelMetaData model:deployment.getModelMetaDatas().values()) {
for (String sourceName:model.getSourceNames()) {
- //TODO: need to make the service as dependency; otherwise dynamic vdbs will not work correctly.
- //vdbService.addDependency(ServiceName.JBOSS.append("data-source", model.getSourceConnectionJndiName(sourceName))); //$NON-NLS-1$
if (!isSourceAvailable(model.getSourceConnectionJndiName(sourceName))) {
unAvailableDS.add(model.getSourceConnectionJndiName(sourceName));
}
}
}
+ // add dependencies to data-sources
+ dataSourceDependencies(deployment, new DependentServices() {
+ @Override
+ public void serviceFound(String dsName, ServiceName svcName) {
+ DataSourceListener dsl = new DataSourceListener(dsName, svcName, vdbStatusChecker);
+ ServiceBuilder<DataSourceListener> sb = context.getServiceTarget().addService(TeiidServiceNames.dsListenerServiceName(dsName), dsl);
+ sb.addDependency(svcName);
+ sb.setInitialMode(Mode.PASSIVE).install();
+ }
+ });
+
// adding the translator services is redundant, however if one is removed then it is an issue.
+ for (Model model:deployment.getModels()) {
+ List<String> sourceNames = model.getSourceNames();
+ for (String sourceName:sourceNames) {
+ String translatorName = model.getSourceTranslatorName(sourceName);
+ if (!deployment.isOverideTranslator(translatorName)) {
+ vdbService.addDependency(TeiidServiceNames.translatorServiceName(translatorName));
+ }
+ }
+ }
+
+ //override translators (if any)
for (Translator t: deployment.getOverrideTranslators()) {
VDBTranslatorMetaData data = (VDBTranslatorMetaData)t;
String type = data.getType();
@@ -149,13 +177,79 @@
LogManager.logInfo(LogConstants.CTX_RUNTIME, IntegrationPlugin.Util.getString("vdb-inactive", deployment.getName(), deployment.getVersion(), unAvailableDS)); //$NON-NLS-1$
}
}
+
+ private void dataSourceDependencies(VDBMetaData deployment, DependentServices svcListener) throws DeploymentUnitProcessingException {
+
+ for (ModelMetaData model:deployment.getModelMetaDatas().values()) {
+ for (String sourceName:model.getSourceNames()) {
+ String translatorName = model.getSourceTranslatorName(sourceName);
+ if (deployment.isOverideTranslator(translatorName)) {
+ VDBTranslatorMetaData translator = deployment.getTranslator(translatorName);
+ translatorName = translator.getType();
+ }
+
+ boolean jdbcSource = true;
+ try {
+ ExecutionFactory ef = VDBService.getExecutionFactory(translatorName, new TranslatorRepository(), this.translatorRepository, deployment, new IdentityHashMap<Translator, ExecutionFactory<Object, Object>>(), new HashSet<String>());
+ jdbcSource = ef.isJDBCSource();
+ } catch (TranslatorNotFoundException e) {
+ if (e.getCause() != null) {
+ throw new DeploymentUnitProcessingException(e.getCause());
+ }
+ throw new DeploymentUnitProcessingException(e.getMessage());
+ }
+ // Need to make the data source service as dependency; otherwise dynamic vdbs will not work correctly.
+ String dsName = model.getSourceConnectionJndiName(sourceName);
+ ServiceName svcName = ServiceName.JBOSS.append("data-source", getJndiName(dsName)); //$NON-NLS-1$
+ if (!jdbcSource) {
+ // TODO: add service dependency on connection-factory (this is pending in AS7)
+ svcName = ServiceName.JBOSS.append("resource-adaptor", getJndiName(dsName)); //$NON-NLS-1$
+ }
+
+ svcListener.serviceFound(dsName, svcName);
+ }
+ }
+ }
+
+ interface DependentServices {
+ void serviceFound(String dsName, ServiceName svc);
+ }
+
+ static class DataSourceListener implements Service<DataSourceListener>{
+ private VDBStatusChecker vdbStatusChecker;
+ private String dsName;
+ private ServiceName svcName;
+
+ public DataSourceListener(String dsName, ServiceName svcName, VDBStatusChecker checker) {
+ this.dsName = dsName;
+ this.svcName = svcName;
+ this.vdbStatusChecker = checker;
+ }
+
+ public DataSourceListener getValue() throws IllegalStateException,IllegalArgumentException {
+ return this;
+ }
+ @Override
+ public void start(StartContext context) throws StartException {
+ ServiceController s = context.getController().getServiceContainer().getService(this.svcName);
+ if (s != null) {
+ this.vdbStatusChecker.dataSourceAdded(this.dsName);
+ }
+ }
+
+ @Override
+ public void stop(StopContext context) {
+ ServiceController s = context.getController().getServiceContainer().getService(this.svcName);
+ if (s.getMode().equals(Mode.REMOVE) || s.getState().equals(State.STOPPING)) {
+ this.vdbStatusChecker.dataSourceRemoved(this.dsName);
+ }
+ }
+ }
+
private boolean isSourceAvailable(String name) {
- String jndiName = name;
- if (!name.startsWith(JAVA_CONTEXT)) {
- jndiName = JAVA_CONTEXT + jndiName;
- }
+ String jndiName = getJndiName(name);
try {
InitialContext ic = new InitialContext();
try {
@@ -169,6 +263,14 @@
return false;
}
return true;
+ }
+
+ private String getJndiName(String name) {
+ String jndiName = name;
+ if (!name.startsWith(JAVA_CONTEXT)) {
+ jndiName = JAVA_CONTEXT + jndiName;
+ }
+ return jndiName;
}
@Override
@@ -177,12 +279,25 @@
return;
}
- VDBMetaData deployment = deploymentUnit.getAttachment(TeiidAttachments.VDB_METADATA);
+ final VDBMetaData deployment = deploymentUnit.getAttachment(TeiidAttachments.VDB_METADATA);
final ServiceController<?> controller = deploymentUnit.getServiceRegistry().getService(TeiidServiceNames.vdbServiceName(deployment.getName(), deployment.getVersion()));
if (controller != null) {
VDBService vdbService = (VDBService)controller.getService();
vdbService.undeployInProgress();
+ try {
+ dataSourceDependencies(deployment, new DependentServices() {
+ @Override
+ public void serviceFound(String dsName, ServiceName svcName) {
+ ServiceController<?> controller = deploymentUnit.getServiceRegistry().getService(TeiidServiceNames.dsListenerServiceName(dsName));
+ if (controller != null) {
+ controller.setMode(ServiceController.Mode.REMOVE);
+ }
+ }
+ });
+ } catch (DeploymentUnitProcessingException e) {
+ LogManager.logDetail(LogConstants.CTX_RUNTIME, e, IntegrationPlugin.Util.getString("vdb-undeploy-failed", deployment.getName(), deployment.getVersion())); //$NON-NLS-1$
+ }
controller.setMode(ServiceController.Mode.REMOVE);
}
}
Modified: branches/as7/jboss-integration/src/main/java/org/teiid/jboss/VDBService.java
===================================================================
--- branches/as7/jboss-integration/src/main/java/org/teiid/jboss/VDBService.java 2011-10-20 15:37:18 UTC (rev 3573)
+++ branches/as7/jboss-integration/src/main/java/org/teiid/jboss/VDBService.java 2011-10-20 17:09:57 UTC (rev 3574)
@@ -234,24 +234,31 @@
String name = model.getSourceTranslatorName(source);
ConnectorManager cm = new ConnectorManager(name, model.getSourceConnectionJndiName(source));
- ExecutionFactory<Object, Object> ef = getExecutionFactory(name, repo, deployment, map, new HashSet<String>());
- cm.setExecutionFactory(ef);
- cm.setModelName(model.getName());
- cmr.addConnectorManager(source, cm);
+ try {
+ ExecutionFactory<Object, Object> ef = getExecutionFactory(name, repo, getTranslatorRepository(), deployment, map, new HashSet<String>());
+ cm.setExecutionFactory(ef);
+ cm.setModelName(model.getName());
+ cmr.addConnectorManager(source, cm);
+ } catch (TranslatorNotFoundException e) {
+ if (e.getCause() != null) {
+ throw new StartException(e.getCause());
+ }
+ throw new StartException(e.getMessage());
+ }
}
}
}
- private ExecutionFactory<Object, Object> getExecutionFactory(String name, TranslatorRepository repo, VDBMetaData deployment, IdentityHashMap<Translator, ExecutionFactory<Object, Object>> map, HashSet<String> building) throws StartException {
+ static ExecutionFactory<Object, Object> getExecutionFactory(String name, TranslatorRepository vdbRepo, TranslatorRepository repo, VDBMetaData deployment, IdentityHashMap<Translator, ExecutionFactory<Object, Object>> map, HashSet<String> building) throws TranslatorNotFoundException {
if (!building.add(name)) {
- throw new StartException(RuntimePlugin.Util.getString("recursive_delegation", deployment.getName(), deployment.getVersion(), building)); //$NON-NLS-1$
+ throw new TranslatorNotFoundException(RuntimePlugin.Util.getString("recursive_delegation", deployment.getName(), deployment.getVersion(), building)); //$NON-NLS-1$
}
- VDBTranslatorMetaData translator = repo.getTranslatorMetaData(name);
+ VDBTranslatorMetaData translator = vdbRepo.getTranslatorMetaData(name);
if (translator == null) {
- translator = getTranslatorRepository().getTranslatorMetaData(name);
+ translator = repo.getTranslatorMetaData(name);
}
if (translator == null) {
- throw new StartException(RuntimePlugin.Util.getString("translator_not_found", deployment.getName(), deployment.getVersion(), name)); //$NON-NLS-1$
+ throw new TranslatorNotFoundException(RuntimePlugin.Util.getString("translator_not_found", deployment.getName(), deployment.getVersion(), name)); //$NON-NLS-1$
}
try {
ExecutionFactory<Object, Object> ef = map.get(translator);
@@ -261,7 +268,7 @@
DelegatingExecutionFactory delegator = (DelegatingExecutionFactory)ef;
String delegateName = delegator.getDelegateName();
if (delegateName != null) {
- ExecutionFactory<Object, Object> delegate = getExecutionFactory(delegateName, repo, deployment, map, building);
+ ExecutionFactory<Object, Object> delegate = getExecutionFactory(delegateName, vdbRepo, repo, deployment, map, building);
((DelegatingExecutionFactory) ef).setDelegate(delegate);
}
}
@@ -269,7 +276,7 @@
}
return ef;
} catch(TeiidException e) {
- throw new StartException(e);
+ throw new TranslatorNotFoundException(e);
}
}
@@ -527,4 +534,14 @@
throw new AdminProcessingException(e);
}
}
+
+ @SuppressWarnings("serial")
+ static class TranslatorNotFoundException extends TeiidException {
+ public TranslatorNotFoundException(String msg) {
+ super(msg);
+ }
+ public TranslatorNotFoundException(Throwable t) {
+ super(t);
+ }
+ }
}
Modified: branches/as7/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties
===================================================================
--- branches/as7/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties 2011-10-20 15:37:18 UTC (rev 3573)
+++ branches/as7/jboss-integration/src/main/resources/org/teiid/jboss/i18n.properties 2011-10-20 17:09:57 UTC (rev 3574)
@@ -62,6 +62,7 @@
wrong_protocol=Wrong type of protocol supplied
socket_binding_not_defined=Socket binding not specified for transport {0}; only embedded access is granted.
embedded_enabled=Teiid Embedded transport enabled.
+vdb-undeploy-failed=error during the undeploy of vdb {0}.{1}
# subsystem description
teiid.add = Add the Teiid Subsystem
Modified: branches/as7/pom.xml
===================================================================
--- branches/as7/pom.xml 2011-10-20 15:37:18 UTC (rev 3573)
+++ branches/as7/pom.xml 2011-10-20 17:09:57 UTC (rev 3574)
@@ -10,7 +10,7 @@
<properties>
<ant.version>1.7.0</ant.version>
<site.url>http://www.jboss.org/teiid</site.url>
- <jbossas-version>7.0.1.Final</jbossas-version>
+ <jbossas-version>7.1.0.Alpha1-SNAPSHOT</jbossas-version>
</properties>
<scm>
<connection>scm:svn:https://anonsvn.jboss.org/repos/teiid/trunk</connection>
Modified: branches/as7/runtime/src/main/java/org/teiid/deployers/VDBStatusChecker.java
===================================================================
--- branches/as7/runtime/src/main/java/org/teiid/deployers/VDBStatusChecker.java 2011-10-20 15:37:18 UTC (rev 3573)
+++ branches/as7/runtime/src/main/java/org/teiid/deployers/VDBStatusChecker.java 2011-10-20 17:09:57 UTC (rev 3574)
@@ -22,8 +22,9 @@
package org.teiid.deployers;
import java.util.LinkedList;
+import java.util.concurrent.Executor;
-import org.jboss.util.threadpool.ThreadPool;
+import org.jboss.msc.value.InjectedValue;
import org.teiid.adminapi.Model;
import org.teiid.adminapi.VDB;
import org.teiid.adminapi.impl.ModelMetaData;
@@ -36,14 +37,14 @@
public class VDBStatusChecker {
- private static final String JAVA_CONTEXT = "java:"; //$NON-NLS-1$
+ private static final String JAVA_CONTEXT = "java:/"; //$NON-NLS-1$
private VDBRepository vdbRepository;
- private ThreadPool threadPool;
+ private final InjectedValue<Executor> executorInjector = new InjectedValue<Executor>();
- public VDBStatusChecker(VDBRepository vdbRepository, ThreadPool threadPool) {
+ public VDBStatusChecker(VDBRepository vdbRepository) {
this.vdbRepository = vdbRepository;
- this.threadPool = threadPool;
}
+
public void translatorAdded(String translatorName) {
resourceAdded(translatorName, true);
}
@@ -113,7 +114,7 @@
if (!runnables.isEmpty()) {
//the task themselves will set the status on completion/failure
for (Runnable runnable : runnables) {
- this.threadPool.run(runnable);
+ getExecutor().execute(runnable);
}
} else if (valid) {
vdb.setStatus(VDB.Status.ACTIVE);
@@ -140,7 +141,7 @@
msg = RuntimePlugin.Util.getString("translator_not_found", vdb.getName(), vdb.getVersion(), model.getSourceTranslatorName(sourceName)); //$NON-NLS-1$
}
else {
- msg = RuntimePlugin.Util.getString("datasource_not_found", vdb.getName(), vdb.getVersion(), model.getSourceTranslatorName(sourceName)); //$NON-NLS-1$
+ msg = RuntimePlugin.Util.getString("datasource_not_found", vdb.getName(), vdb.getVersion(), resourceName); //$NON-NLS-1$
}
model.addError(ModelMetaData.ValidationError.Severity.ERROR.name(), msg);
LogManager.logInfo(LogConstants.CTX_RUNTIME, msg);
@@ -169,4 +170,12 @@
}
return null;
}
+
+ public InjectedValue<Executor> getExecutorInjector(){
+ return this.executorInjector;
+ }
+
+ private Executor getExecutor() {
+ return this.executorInjector.getValue();
+ }
}
13 years, 2 months
teiid SVN: r3573 - in trunk/engine/src: test/java/org/teiid/common/buffer/impl and 1 other directory.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2011-10-20 11:37:18 -0400 (Thu, 20 Oct 2011)
New Revision: 3573
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java
Log:
TEIID-1750 fixing a bug with positional filestore writes and fixing size indexing.
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java 2011-10-20 14:41:28 UTC (rev 3572)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockInputStream.java 2011-10-20 15:37:18 UTC (rev 3573)
@@ -34,12 +34,10 @@
int blockIndex;
ByteBuffer buf;
boolean done;
- private final boolean threadSafe;
- BlockInputStream(BlockManager manager, int blockCount, boolean threadSafe) {
+ BlockInputStream(BlockManager manager, int blockCount) {
this.manager = manager;
this.maxBlock = blockCount;
- this.threadSafe = threadSafe;
}
@Override
@@ -58,9 +56,6 @@
return;
}
buf = manager.getBlock(blockIndex++);
- if (threadSafe) {
- buf = buf.duplicate();
- }
}
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2011-10-20 14:41:28 UTC (rev 3572)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2011-10-20 15:37:18 UTC (rev 3573)
@@ -405,12 +405,9 @@
List<BlockStore> stores = new ArrayList<BlockStore>();
int size = BLOCK_SIZE;
do {
- if ((size>>1) >= maxStorageObjectSize) {
- size>>=1; //adjust the last block size if needed
- }
- stores.add(new BlockStore(this.storageManager, size, 15, BufferManagerImpl.CONCURRENCY_LEVEL>>2));
- size <<=2;
- } while (size>>2 < maxStorageObjectSize);
+ stores.add(new BlockStore(this.storageManager, size, 30, BufferManagerImpl.CONCURRENCY_LEVEL));
+ size <<=1;
+ } while ((size>>1) < maxStorageObjectSize);
this.sizeBasedStores = stores.toArray(new BlockStore[stores.size()]);
}
@@ -503,7 +500,6 @@
}
} catch (Throwable e) {
if (e == PhysicalInfo.sizeChanged) {
- //System.out.println("size changed " + info.inode + " " + info.block + " " + info);
//entries are mutable after adding, the original should be removed shortly so just ignore
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Object "+ entry.getId() +" changed size since first persistence, keeping the original."); //$NON-NLS-1$ //$NON-NLS-2$
} else {
@@ -587,7 +583,7 @@
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Getting object at inode", info.inode, serializer.getId(), oid); //$NON-NLS-1$
}
BlockManager manager = getBlockManager(serializer.getId(), oid, info.inode);
- is = new BlockInputStream(manager, info.memoryBlockCount, info.evicting);
+ is = new BlockInputStream(manager, info.memoryBlockCount);
} else if (info.block != EMPTY_ADDRESS) {
memoryBufferEntries.recordAccess(info);
storageReads.incrementAndGet();
@@ -720,7 +716,7 @@
try {
if (demote && block == EMPTY_ADDRESS) {
storageWrites.getAndIncrement();
- BlockInputStream is = new BlockInputStream(bm, memoryBlockCount, false); //we know this can always be single threaded
+ BlockInputStream is = new BlockInputStream(bm, memoryBlockCount);
BlockStore blockStore = sizeBasedStores[sizeIndex];
block = getAndSetNextClearBit(blockStore);
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
@@ -930,9 +926,9 @@
return; //no changes
}
this.memoryBlockCount = newMemoryBlockCount;
- while (newMemoryBlockCount >= 1) {
+ while (newMemoryBlockCount > 1) {
this.sizeIndex++;
- newMemoryBlockCount>>=2;
+ newMemoryBlockCount = (newMemoryBlockCount>>1) + ((newMemoryBlockCount&0x01)==0?0:1);
}
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-10-20 14:41:28 UTC (rev 3572)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-10-20 15:37:18 UTC (rev 3573)
@@ -323,7 +323,7 @@
}
}
- static final int CONCURRENCY_LEVEL = 32; //TODO: make this configurable
+ static final int CONCURRENCY_LEVEL = 32; //TODO: make this configurable since it is roughly the same as max active plans
private static final int TARGET_BYTES_PER_ROW = 1 << 11; //2k bytes per row
private static ReferenceQueue<CacheEntry> SOFT_QUEUE = new ReferenceQueue<CacheEntry>();
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java 2011-10-20 14:41:28 UTC (rev 3572)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/ConcurrentBitSet.java 2011-10-20 15:37:18 UTC (rev 3573)
@@ -180,7 +180,7 @@
}
/**
- * Set to try to always allocate against the first available block in a segment.
+ * Set to always allocate against the first available block in a segment.
* @param compact
*/
public void setCompact(boolean compact) {
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java 2011-10-20 14:41:28 UTC (rev 3572)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java 2011-10-20 15:37:18 UTC (rev 3573)
@@ -133,11 +133,10 @@
if (bytesUsed > 0) {
long used = usedBufferSpace.addAndGet(bytesUsed);
if (used > maxBufferSpace) {
- usedBufferSpace.addAndGet(-bytesUsed);
//TODO: trigger a compaction before this is thrown
throw new IOException(QueryPlugin.Util.getString("FileStoreageManager.space_exhausted", maxBufferSpace)); //$NON-NLS-1$
}
- fileAccess.setLength(bytesUsed);
+ fileAccess.setLength(newLength);
bytesUsed = 0;
}
fileAccess.seek(fileOffset);
Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java 2011-10-20 14:41:28 UTC (rev 3572)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java 2011-10-20 15:37:18 UTC (rev 3573)
@@ -188,4 +188,26 @@
return fsc;
}
+ @Test public void testSizeIndex() throws Exception {
+ PhysicalInfo info = new PhysicalInfo(1l, 1l, -1, 0);
+ info.setSize(1<<13);
+ assertEquals(0, info.sizeIndex);
+
+ info = new PhysicalInfo(1l, 1l, -1, 0);
+ info.setSize(1 + (1<<13));
+ assertEquals(1, info.sizeIndex);
+
+ info = new PhysicalInfo(1l, 1l, -1, 0);
+ info.setSize(2 + (1<<15));
+ assertEquals(3, info.sizeIndex);
+ }
+
+ @Test(expected=Exception.class) public void testSizeChanged() throws Exception {
+ PhysicalInfo info = new PhysicalInfo(1l, 1l, -1, 0);
+ info.setSize(1<<13);
+ assertEquals(0, info.sizeIndex);
+
+ info.setSize(1 + (1<<13));
+ }
+
}
Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java 2011-10-20 14:41:28 UTC (rev 3572)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStorageManager.java 2011-10-20 15:37:18 UTC (rev 3573)
@@ -59,6 +59,25 @@
store.remove();
assertEquals(0, sm.getUsedBufferSpace());
}
+
+ @Test public void testPositionalWrite() throws Exception {
+ FileStorageManager sm = getStorageManager(null, null);
+ String tsID = "0"; //$NON-NLS-1$
+ FileStore store = sm.createFileStore(tsID);
+ byte[] expectedBytes = writeBytes(store, 2048);
+ assertEquals(4096, sm.getUsedBufferSpace());
+
+ writeBytes(store, 4096);
+ assertEquals(6144, sm.getUsedBufferSpace());
+
+ byte[] bytesRead = new byte[2048];
+ store.readFully(2048, bytesRead, 0, bytesRead.length);
+
+ assertArrayEquals(expectedBytes, bytesRead);
+
+ store.remove();
+ assertEquals(0, sm.getUsedBufferSpace());
+ }
@Test(expected=IOException.class) public void testMaxSpace() throws Exception {
FileStorageManager sm = getStorageManager(null, null);
@@ -81,15 +100,19 @@
static Random r = new Random();
- static void writeBytes(FileStore store)
+ static void writeBytes(FileStore store) throws IOException {
+ writeBytes(store, store.getLength());
+ }
+
+ static byte[] writeBytes(FileStore store, long start)
throws IOException {
byte[] bytes = new byte[2048];
r.nextBytes(bytes);
- long start = store.getLength();
- store.write(bytes, 0, bytes.length);
+ store.write(start, bytes, 0, bytes.length);
byte[] bytesRead = new byte[2048];
store.readFully(start, bytesRead, 0, bytesRead.length);
assertTrue(Arrays.equals(bytes, bytesRead));
+ return bytes;
}
@Test public void testWritingMultipleFiles() throws Exception {
13 years, 2 months
teiid SVN: r3572 - trunk/documentation/admin-guide/src/main/docbook/en-US/content.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2011-10-20 10:41:28 -0400 (Thu, 20 Oct 2011)
New Revision: 3572
Modified:
trunk/documentation/admin-guide/src/main/docbook/en-US/content/performance.xml
Log:
TEIID-1750 correcting xml
Modified: trunk/documentation/admin-guide/src/main/docbook/en-US/content/performance.xml
===================================================================
--- trunk/documentation/admin-guide/src/main/docbook/en-US/content/performance.xml 2011-10-20 01:51:50 UTC (rev 3571)
+++ trunk/documentation/admin-guide/src/main/docbook/en-US/content/performance.xml 2011-10-20 14:41:28 UTC (rev 3572)
@@ -84,7 +84,7 @@
With large off-heap buffer sizes (greater than several gigabytes) you may also need to adjust VM settings. For Sun VMs the relevant VM settings are MaxDirectMemorySize and UseLargePages. For example adding:
<programlisting>-XX:MaxDirectMemorySize=12g -XX:+UseLargePages</programlisting>
to the VM process arguments would allow for an effective allocation of approximately an 11GB Teiid memory buffer (the memoryBufferSpace setting) accounting for any additional direct memory that may be needed
- by the AS or applications running in the AS.
+ by the AS or applications running in the AS.</para>
</section>
<section>
<title>Disk Usage</title>
13 years, 2 months
teiid SVN: r3571 - in trunk: build/kits/jboss-container/deploy/teiid and 6 other directories.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2011-10-19 21:51:50 -0400 (Wed, 19 Oct 2011)
New Revision: 3571
Modified:
trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml
trunk/build/kits/jboss-container/teiid-releasenotes.html
trunk/documentation/admin-guide/src/main/docbook/en-US/content/performance.xml
trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java
trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java
trunk/engine/src/main/java/org/teiid/common/buffer/STree.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockByteBuffer.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java
trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
Log:
TEIID-1750 lowering batch sizes, removing the small byte buffer slices in favor of absolute positioning
Modified: trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml
===================================================================
--- trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml 2011-10-19 15:42:21 UTC (rev 3570)
+++ trunk/build/kits/jboss-container/deploy/teiid/teiid-jboss-beans.xml 2011-10-20 01:51:50 UTC (rev 3571)
@@ -24,10 +24,10 @@
<property name="useDisk">true</property>
<!-- Directory location for the buffer files -->
<property name="diskDirectory">${jboss.server.temp.dir}/teiid</property>
- <!-- The max row count of a batch sent internally within the query processor. Should be <= the connectorBatchSize. (default 512) -->
- <property name="processorBatchSize">512</property>
- <!-- The max row count of a batch from a connector. Should be even multiple of processorBatchSize. (default 1024) -->
- <property name="connectorBatchSize">1024</property>
+ <!-- The max row count of a batch sent internally within the query processor. Should be <= the connectorBatchSize. (default 256) -->
+ <property name="processorBatchSize">256</property>
+ <!-- The max row count of a batch from a connector. Should be even multiple of processorBatchSize. (default 512) -->
+ <property name="connectorBatchSize">512</property>
<!--
The approximate amount of buffer memory in kilobytes allowable for a single processing operation (sort, grouping, etc.) regardless of existing memory commitments. -1 means to automatically calculate a value (default -1).
See the admin guide for more.
Modified: trunk/build/kits/jboss-container/teiid-releasenotes.html
===================================================================
--- trunk/build/kits/jboss-container/teiid-releasenotes.html 2011-10-19 15:42:21 UTC (rev 3570)
+++ trunk/build/kits/jboss-container/teiid-releasenotes.html 2011-10-20 01:51:50 UTC (rev 3571)
@@ -30,10 +30,10 @@
<LI><B>File Enhancements</B> - the file translator can now optionally (via the ExceptionIfFileNotFound property) throw an exception if the path refers to a file that doesn't exist. The file resource adapter can be configured to map file names and can prevent parent path .. references. See the Admin Guide or the file-ds.xml template for more.
<LI><B>TEXTTABLE Enhancements</B> - TEXTTABLE can now parse fixed width files that do not use a row delimiter and can optionally produce fixed values that haven't been trimmed.
<LI><B>Temp table transactions</B> - Internal materialized views and temp table usage from a session and within procedures can take advantage of greater transaction support.
- <LI><B>Buffering Improvements</B> - Added the ability to inline memory based or small lobs and added tracking of the memory held by soft references. Also switched to a LFRU algorithm that significantly reduces writes and read misses with temporary tables.
+ <LI><B>Buffering Improvements</B> - Added the ability to inline memory based or small lobs and added tracking of the memory held by soft references. Also switched to a concurrent LFRU algorithm that significantly reduces writes and read misses with temporary tables. Added a memory buffer to better handle file storage.
+ The memory buffer may be optional configured as off-heap for better large memory performance.
<LI><B>GSSAPI</B> - both the Teiid JDBC client/server and the ODBC pg backend can now support GSSAPI for single sign-on.
<LI><B>Server-side Query Timeouts</B> - default query timeouts can be configured at both the VDB (via the query-timeout VDB property) and entire server (via the teiid-jboss-beans.xml queryTimeout property).
- <LI><B>Memory Improvements</B> - buffering was optimized for concurrency and to better handle table querying instead of tuple buffers. Added a memory buffer to better handle file storage. The memory buffer may be optional configured as off-heap for better large memory performance.
</UL>
<h2><a name="Compatibility">Compatibility Issues</a></h2>
@@ -119,6 +119,11 @@
<h4>from 7.4</h4>
<ul>
+ <li>The configuration for the buffer service now defaults to 256/512 for processor and connector batch sizes respectively. The buffer service also has 4 new properties inlineLobs, memoryBufferSpace, memoryBufferOffHeap, and maxStorageObjectSize.
+</ul>
+
+<h4>from 7.4</h4>
+<ul>
<LI>The configuration for authorization has been moved off of the RuntimeEngineDeployer bean and onto separate AuthorizationValidator and PolicyDecider beans.
<LI>The configuration for the buffer manager has been simplified to refer to memory sizes in KB, rather than batch columns.
</ul>
Modified: trunk/documentation/admin-guide/src/main/docbook/en-US/content/performance.xml
===================================================================
--- trunk/documentation/admin-guide/src/main/docbook/en-US/content/performance.xml 2011-10-19 15:42:21 UTC (rev 3570)
+++ trunk/documentation/admin-guide/src/main/docbook/en-US/content/performance.xml 2011-10-20 01:51:50 UTC (rev 3571)
@@ -67,7 +67,7 @@
<title>Big Data/Memory</title>
<para>Usage of extremely large VM sizes and or datasets requires additional considerations.
Teiid has a non-negligible amount of overhead per batch/table page on the order of 100-200 bytes. Depending on the data types involved each
- full batch/table page will represent between 64 and 4096 rows. If you are dealing with datasets with billions of rows and you run into OutOfMemory issues, consider increasing the processor
+ full batch/table page will represent a variable number of rows (a power of two multiple above or below the processor batch size). If you are dealing with datasets with billions of rows and you run into OutOfMemory issues, consider increasing the processor
batch size in the &jboss-beans; file to force the allocation of larger batches and table pages. If the processor batch size is increased and/or you are dealing with extremely wide result sets (several hundred columns),
then the default setting of 8MB for the maxStorageObjectSize in the &jboss-beans; file may be too low. The sizing for maxStorageObjectSize is terms of serialized size, which will be much
closer to the raw data size then the Java memory footprint estimation used for maxReservedKB.
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java 2011-10-19 15:42:21 UTC (rev 3570)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/BufferManager.java 2011-10-20 01:51:50 UTC (rev 3571)
@@ -61,8 +61,8 @@
NO_WAIT
}
- public static int DEFAULT_CONNECTOR_BATCH_SIZE = 1024;
- public static int DEFAULT_PROCESSOR_BATCH_SIZE = 512;
+ public static int DEFAULT_CONNECTOR_BATCH_SIZE = 512;
+ public static int DEFAULT_PROCESSOR_BATCH_SIZE = 256;
public static int DEFAULT_MAX_PROCESSING_KB = -1;
public static int DEFAULT_RESERVE_BUFFER_KB = -1;
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java 2011-10-19 15:42:21 UTC (rev 3570)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java 2011-10-20 01:51:50 UTC (rev 3571)
@@ -49,8 +49,6 @@
@SuppressWarnings("unchecked")
class SPage implements Cloneable {
- static final int MIN_PERSISTENT_SIZE = 16;
-
static class SearchResult {
int index;
SPage page;
@@ -186,7 +184,7 @@
if (values instanceof LightWeightCopyOnWriteList<?>) {
values = ((LightWeightCopyOnWriteList<List<?>>)values).getList();
}
- if (values.size() < MIN_PERSISTENT_SIZE) {
+ if (values.size() < stree.minPageSize) {
setDirectValues(values);
return;
} else if (stree.batchInsert && children == null && values.size() < stree.leafSize) {
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/STree.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/STree.java 2011-10-19 15:42:21 UTC (rev 3570)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/STree.java 2011-10-20 01:51:50 UTC (rev 3571)
@@ -65,6 +65,7 @@
protected ListNestedSortComparator comparator;
private int pageSize;
protected int leafSize;
+ protected int minPageSize;
protected int keyLength;
protected boolean batchInsert;
protected SPage incompleteInsert;
@@ -86,7 +87,7 @@
manager.setPrefersMemory(true);
this.leafManager = leafManager;
this.comparator = comparator;
- this.pageSize = Math.max(pageSize, SPage.MIN_PERSISTENT_SIZE);
+ this.pageSize = pageSize;
pageSize >>>= 3;
while (pageSize > 0) {
pageSize >>>= 1;
@@ -97,6 +98,7 @@
this.leafSize = leafSize;
this.keyLength = keyLength;
this.lobManager = lobManager;
+ this.minPageSize = this.pageSize>>5;
}
public STree clone() {
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockByteBuffer.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockByteBuffer.java 2011-10-19 15:42:21 UTC (rev 3570)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockByteBuffer.java 2011-10-20 01:51:50 UTC (rev 3571)
@@ -42,22 +42,12 @@
}
}
- private static class BlockInfo {
- final ByteBuffer bb;
- final int block;
- public BlockInfo(ByteBuffer bb, int block) {
- this.bb = bb;
- this.block = block;
- }
- }
-
private int blockAddressBits;
private int segmentAddressBits;
private int segmentSize;
private int blockSize;
private int blockCount;
private ThreadLocal<ByteBuffer>[] buffers;
- private BlockInfo[] bufferCache;
/**
* Creates a new {@link BlockByteBuffer} where each buffer segment will be
@@ -88,8 +78,6 @@
if (lastSegmentSize > 0) {
buffers[fullSegments] = new ThreadLocalByteBuffer(allocate(lastSegmentSize, direct));
}
- int logSize = 32 - Integer.numberOfLeadingZeros(blockCount);
- bufferCache = new BlockInfo[Math.min(logSize, 20)];
}
public static ByteBuffer allocate(int size, boolean direct) {
@@ -106,7 +94,7 @@
}
/**
- * Return a buffer containing the given start byte.
+ * Return a buffer positioned at the given start byte.
* It is assumed that the caller will handle blocks in
* a thread safe manner.
* @param startIndex
@@ -116,21 +104,12 @@
if (block < 0 || block >= blockCount) {
throw new IndexOutOfBoundsException("Invalid block " + block); //$NON-NLS-1$
}
- int cacheIndex = block&(bufferCache.length -1);
- BlockInfo info = bufferCache[cacheIndex];
- if (info != null && info.block == block) {
- info.bb.rewind();
- return info.bb;
- }
int segment = block>>(segmentAddressBits-blockAddressBits);
ByteBuffer bb = buffers[segment].get();
- bb.limit(bb.capacity());
+ bb.rewind();
int position = (block<<blockAddressBits)&(segmentSize-1);
- bb.position(position);
bb.limit(position + blockSize);
- bb = bb.slice();
- info = new BlockInfo(bb, block);
- bufferCache[cacheIndex] = info;
+ bb.position(position);
return bb;
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2011-10-19 15:42:21 UTC (rev 3570)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2011-10-20 01:51:50 UTC (rev 3571)
@@ -53,7 +53,6 @@
import org.teiid.core.TeiidComponentException;
import org.teiid.core.TeiidRuntimeException;
import org.teiid.core.util.ExecutorUtils;
-import org.teiid.core.util.ObjectConverterUtil;
import org.teiid.logging.LogConstants;
import org.teiid.logging.LogManager;
import org.teiid.logging.MessageLevel;
@@ -166,27 +165,27 @@
if (index >= MAX_INDIRECT) {
position = BYTES_PER_BLOCK_ADDRESS*(DIRECT_POINTERS+1);
ByteBuffer next = updateIndirectBlockInfo(info, index, position, MAX_INDIRECT, value, mode);
- if (next != info) {
+ if (next != null) {
info = next;
//should have traversed to the secondary
int indirectAddressBlock = (index - MAX_INDIRECT) / ADDRESSES_PER_BLOCK;
- position = indirectAddressBlock * BYTES_PER_BLOCK_ADDRESS;
- if (mode == Mode.ALLOCATE && position + BYTES_PER_BLOCK_ADDRESS < BLOCK_SIZE) {
+ position = info.position() + indirectAddressBlock * BYTES_PER_BLOCK_ADDRESS;
+ if (mode == Mode.ALLOCATE && position + BYTES_PER_BLOCK_ADDRESS < info.limit()) {
info.putInt(position + BYTES_PER_BLOCK_ADDRESS, EMPTY_ADDRESS);
}
next = updateIndirectBlockInfo(info, index, position, MAX_INDIRECT + indirectAddressBlock * ADDRESSES_PER_BLOCK, value, mode);
- if (next != info) {
+ if (next != null) {
info = next;
- position = ((index - MAX_INDIRECT)%ADDRESSES_PER_BLOCK) * BYTES_PER_BLOCK_ADDRESS;
+ position = info.position() + ((index - MAX_INDIRECT)%ADDRESSES_PER_BLOCK) * BYTES_PER_BLOCK_ADDRESS;
}
}
} else if (index >= DIRECT_POINTERS) {
//indirect
position = BYTES_PER_BLOCK_ADDRESS*DIRECT_POINTERS;
ByteBuffer next = updateIndirectBlockInfo(info, index, position, DIRECT_POINTERS, value, mode);
- if (next != info) {
+ if (next != null) {
info = next;
- position = (index - DIRECT_POINTERS) * BYTES_PER_BLOCK_ADDRESS;
+ position = next.position() + (index - DIRECT_POINTERS) * BYTES_PER_BLOCK_ADDRESS;
}
} else {
position = BYTES_PER_BLOCK_ADDRESS*index;
@@ -194,7 +193,7 @@
if (mode == Mode.ALLOCATE) {
dataBlock = nextBlock(true);
info.putInt(position, dataBlock);
- if (mode == Mode.ALLOCATE && position + BYTES_PER_BLOCK_ADDRESS < BLOCK_SIZE) {
+ if (mode == Mode.ALLOCATE && position + BYTES_PER_BLOCK_ADDRESS < info.limit()) {
//maintain the invariant that the next pointer is empty
info.putInt(position + BYTES_PER_BLOCK_ADDRESS, EMPTY_ADDRESS);
}
@@ -215,7 +214,7 @@
buf.putInt(position, sib_index);
} else if (mode == Mode.UPDATE && value == EMPTY_ADDRESS) {
freeDataBlock(sib_index);
- return buf;
+ return null;
}
}
return blockByteBuffer.getByteBuffer(sib_index);
@@ -273,7 +272,7 @@
ByteBuffer bb = getInodeBlock();
bb.putInt(EMPTY_ADDRESS);
}
- inodeBuffer = inodeByteBuffer.getByteBuffer(inode);
+ inodeBuffer = inodeByteBuffer.getByteBuffer(inode).slice();
}
return inodeBuffer;
}
@@ -299,7 +298,7 @@
if (!freedAll || doublyIndirectIndexBlock == EMPTY_ADDRESS) {
return acquire?dataBlockToAcquire:EMPTY_ADDRESS;
}
- bb = blockByteBuffer.getByteBuffer(doublyIndirectIndexBlock);
+ bb = blockByteBuffer.getByteBuffer(doublyIndirectIndexBlock).slice();
freeBlock(0, bb, ADDRESSES_PER_BLOCK, false);
freeDataBlock(doublyIndirectIndexBlock);
return acquire?dataBlockToAcquire:EMPTY_ADDRESS;
@@ -307,7 +306,7 @@
private boolean freeIndirectBlock(int indirectIndexBlock) {
ByteBuffer bb = blockByteBuffer.getByteBuffer(indirectIndexBlock);
- boolean freedAll = freeBlock(0, bb, ADDRESSES_PER_BLOCK, true);
+ boolean freedAll = freeBlock(bb.position(), bb, ADDRESSES_PER_BLOCK, true);
freeDataBlock(indirectIndexBlock);
return freedAll;
}
@@ -409,7 +408,7 @@
if ((size>>1) >= maxStorageObjectSize) {
size>>=1; //adjust the last block size if needed
}
- stores.add(new BlockStore(this.storageManager, size, 30, BufferManagerImpl.CONCURRENCY_LEVEL>>2));
+ stores.add(new BlockStore(this.storageManager, size, 15, BufferManagerImpl.CONCURRENCY_LEVEL>>2));
size <<=2;
} while (size>>2 < maxStorageObjectSize);
this.sizeBasedStores = stores.toArray(new BlockStore[stores.size()]);
@@ -449,20 +448,22 @@
if (!map.containsKey(entry.getId())) {
return true; //already removed
}
- info = new PhysicalInfo(s.getId(), entry.getId(), EMPTY_ADDRESS);
+ info = new PhysicalInfo(s.getId(), entry.getId(), EMPTY_ADDRESS, (int)readAttempts.get());
+ info.adding = true;
map.put(entry.getId(), info);
}
}
}
if (!newEntry) {
synchronized (info) {
- if (info.inode == EMPTY_ADDRESS && info.block == EMPTY_ADDRESS) {
+ if (info.adding) {
return false; //someone else is responsible for adding this cache entry
}
if (info.evicting || info.inode != EMPTY_ADDRESS
|| !shouldPlaceInMemoryBuffer(0, info)) {
return true; //safe to remove from tier 1
}
+ info.adding = true;
//second chance re-add to the cache, we assume that serialization would be faster than a disk read
}
}
@@ -492,19 +493,35 @@
synchronized (map) {
if (physicalMapping.containsKey(s.getId()) && map.containsKey(entry.getId())) {
synchronized (info) {
+ //set the size first, since it may raise an exceptional condition
+ info.setSize(bos.getBytesWritten());
info.inode = blockManager.getInode();
- info.setSize(bos.getBytesWritten());
- memoryBufferEntries.touch(info, newEntry);
+ memoryBufferEntries.add(info);
}
success = true;
}
}
} catch (Throwable e) {
- LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch, attempts to read batch "+ entry.getId() +" later will result in an exception"); //$NON-NLS-1$ //$NON-NLS-2$
+ if (e == PhysicalInfo.sizeChanged) {
+ //System.out.println("size changed " + info.inode + " " + info.block + " " + info);
+ //entries are mutable after adding, the original should be removed shortly so just ignore
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Object "+ entry.getId() +" changed size since first persistence, keeping the original."); //$NON-NLS-1$ //$NON-NLS-2$
+ } else {
+ LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting, attempts to read "+ entry.getId() +" later will result in an exception."); //$NON-NLS-1$ //$NON-NLS-2$
+ }
} finally {
if (hasPermit) {
memoryWritePermits.release();
}
+ if (info != null) {
+ synchronized (info) {
+ info.adding = false;
+ if (!success && blockManager != null) {
+ //invalidate for safety
+ info.inode = EMPTY_ADDRESS;
+ }
+ }
+ }
if (!success && blockManager != null) {
blockManager.free(false);
}
@@ -558,23 +575,21 @@
if (serializer == null) {
return null;
}
- long currentTime = readAttempts.incrementAndGet();
InputStream is = null;
- boolean inStorage = false;
try {
synchronized (info) {
assert !info.pinned && info.loading; //load should be locked
await(info, true, false); //not necessary, but should make things safer
if (info.inode != EMPTY_ADDRESS) {
info.pinned = true;
- memoryBufferEntries.touch(info, false);
+ memoryBufferEntries.touch(info);
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Getting object at inode", info.inode, serializer.getId(), oid); //$NON-NLS-1$
}
BlockManager manager = getBlockManager(serializer.getId(), oid, info.inode);
is = new BlockInputStream(manager, info.memoryBlockCount, info.evicting);
} else if (info.block != EMPTY_ADDRESS) {
- inStorage = true;
+ memoryBufferEntries.recordAccess(info);
storageReads.incrementAndGet();
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Getting object at block", info.block, info.sizeIndex, serializer.getId(), oid); //$NON-NLS-1$
@@ -587,28 +602,6 @@
return null;
}
}
- if (inStorage && shouldPlaceInMemoryBuffer(currentTime, info) && this.memoryWritePermits.tryAcquire()) {
- BlockManager manager = null;
- boolean success = false;
- try {
- manager = getBlockManager(info.gid, info.getId(), EMPTY_ADDRESS);
- ExtensibleBufferedOutputStream os = new BlockOutputStream(manager);
- ObjectConverterUtil.write(os, is, -1);
- synchronized (info) {
- assert !info.pinned;
- info.inode = manager.getInode();
- info.pinned = true;
- memoryBufferEntries.touch(info, false);
- is = new BlockInputStream(manager, info.memoryBlockCount, info.evicting);
- }
- success = true;
- } finally {
- this.memoryWritePermits.release();
- if (!success && manager != null) {
- manager.free(false);
- }
- }
- }
ObjectInputStream ois = new ObjectInputStream(is);
CacheEntry ce = new CacheEntry(new CacheKey(oid, 1, 1), ois.readInt(), serializer.deserialize(ois), ref, true);
return ce;
@@ -765,6 +758,9 @@
}
if (block != EMPTY_ADDRESS) {
if (demote) {
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.DETAIL)) {
+ LogManager.logDetail(LogConstants.CTX_BUFFER_MGR, "Assigning storage data block", block, "of size", sizeBasedStores[info.sizeIndex].blockSize); //$NON-NLS-1$ //$NON-NLS-2$
+ }
info.block = block;
} else {
BlockStore blockStore = sizeBasedStores[info.sizeIndex];
@@ -903,6 +899,9 @@
* Currently should be 48 bytes.
*/
final class PhysicalInfo extends BaseCacheEntry {
+
+ static final Exception sizeChanged = new Exception();
+
final Long gid;
//the memory inode and block count
int inode = BufferFrontedFileStoreCache.EMPTY_ADDRESS;
@@ -914,20 +913,26 @@
boolean pinned; //indicates that the entry is being read
boolean evicting; //indicates that the entry will be moved out of the memory buffer
boolean loading; //used by tier 1 cache to prevent double loads
+ boolean adding; //used to prevent double adds
- public PhysicalInfo(Long gid, Long id, int inode) {
- super(new CacheKey(id, 0, 0));
+ public PhysicalInfo(Long gid, Long id, int inode, int lastAccess) {
+ super(new CacheKey(id, lastAccess, 0));
this.inode = inode;
this.gid = gid;
}
- public void setSize(int size) {
- this.memoryBlockCount = (size>>BufferFrontedFileStoreCache.LOG_BLOCK_SIZE) + ((size&BufferFrontedFileStoreCache.BLOCK_MASK)>0?1:0);
- int blocks = memoryBlockCount;
- this.sizeIndex = 0;
- while (blocks >= 1) {
+ public void setSize(int size) throws Exception {
+ int newMemoryBlockCount = (size>>BufferFrontedFileStoreCache.LOG_BLOCK_SIZE) + ((size&BufferFrontedFileStoreCache.BLOCK_MASK)>0?1:0);
+ if (this.memoryBlockCount != 0) {
+ if (newMemoryBlockCount != memoryBlockCount) {
+ throw sizeChanged;
+ }
+ return; //no changes
+ }
+ this.memoryBlockCount = newMemoryBlockCount;
+ while (newMemoryBlockCount >= 1) {
this.sizeIndex++;
- blocks>>=2;
+ newMemoryBlockCount>>=2;
}
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-10-19 15:42:21 UTC (rev 3570)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-10-20 01:51:50 UTC (rev 3571)
@@ -203,13 +203,13 @@
old = fastGet(previous, prefersMemory.get(), true);
}
}
- CacheKey key = new CacheKey(oid, 0, old!=null?old.getKey().getOrderingValue():0);
+ CacheKey key = new CacheKey(oid, (int)readAttempts.get(), old!=null?old.getKey().getOrderingValue():0);
CacheEntry ce = new CacheEntry(key, sizeEstimate, batch, this.ref, false);
if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE)) {
LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Add batch to BufferManager", ce.getId(), "with size estimate", ce.getSizeEstimate()); //$NON-NLS-1$ //$NON-NLS-2$
}
cache.addToCacheGroup(id, ce.getId());
- addMemoryEntry(ce);
+ addMemoryEntry(ce, true);
return oid;
}
@@ -285,7 +285,7 @@
cache.remove(this.id, batch);
}
if (retain) {
- addMemoryEntry(ce);
+ addMemoryEntry(ce, false);
}
} finally {
cache.unlockForLoad(o);
@@ -693,8 +693,10 @@
if (ce == null) {
break;
}
- if (!memoryEntries.containsKey(ce.getId())) {
- continue; //not currently a valid eviction
+ synchronized (ce) {
+ if (!memoryEntries.containsKey(ce.getId())) {
+ continue; //not currently a valid eviction
+ }
}
boolean evicted = true;
try {
@@ -763,7 +765,7 @@
//there is a minute chance the batch was evicted
//this call ensures that we won't leak
if (memoryEntries.containsKey(batch)) {
- evictionQueue.touch(ce, false);
+ evictionQueue.touch(ce);
}
} else {
evictionQueue.remove(ce);
@@ -791,7 +793,7 @@
if (ce != null && ce.getObject() != null) {
referenceHit.getAndIncrement();
if (retain) {
- addMemoryEntry(ce);
+ addMemoryEntry(ce, false);
} else {
BufferManagerImpl.this.remove(ce, false);
}
@@ -823,11 +825,15 @@
}
}
- void addMemoryEntry(CacheEntry ce) {
+ void addMemoryEntry(CacheEntry ce, boolean initial) {
persistBatchReferences();
synchronized (ce) {
memoryEntries.put(ce.getId(), ce);
- evictionQueue.touch(ce, true);
+ if (initial) {
+ evictionQueue.add(ce);
+ } else {
+ evictionQueue.touch(ce);
+ }
}
activeBatchKB.getAndAdd(ce.getSizeEstimate());
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java 2011-10-19 15:42:21 UTC (rev 3570)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java 2011-10-20 01:51:50 UTC (rev 3571)
@@ -30,6 +30,7 @@
protected ByteBuffer buf;
protected int bytesWritten;
+ private int startPosition;
public ExtensibleBufferedOutputStream() {
}
@@ -45,6 +46,7 @@
private void ensureBuffer() {
if (buf == null) {
buf = newBuffer();
+ startPosition = buf.position();
}
}
@@ -63,8 +65,11 @@
}
public void flush() throws IOException {
- if (buf != null && buf.position() > 0) {
- bytesWritten += flushDirect(buf.position());
+ if (buf != null) {
+ int bytes = buf.position() - startPosition;
+ if (bytes > 0) {
+ bytesWritten += flushDirect(bytes);
+ }
}
buf = null;
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java 2011-10-19 15:42:21 UTC (rev 3570)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/LrfuEvictionQueue.java 2011-10-20 01:51:50 UTC (rev 3571)
@@ -60,11 +60,13 @@
return evictionQueue.remove(value.getKey()) != null;
}
- public void touch(V value, boolean initial) {
- if (!initial) {
- initial = evictionQueue.remove(value.getKey()) == null;
- }
- recordAccess(value, initial);
+ public boolean add(V value) {
+ return evictionQueue.put(value.getKey(), value) == null;
+ }
+
+ public void touch(V value) {
+ evictionQueue.remove(value.getKey());
+ recordAccess(value);
evictionQueue.put(value.getKey(), value);
}
@@ -85,14 +87,13 @@
return null;
}
- protected void recordAccess(V value, boolean initial) {
- assert Thread.holdsLock(value);
+ /**
+ * Callers should be synchronized on value
+ */
+ public void recordAccess(V value) {
CacheKey key = value.getKey();
int lastAccess = key.getLastAccess();
long currentClock = clock.get();
- if (initial && lastAccess == 0) {
- return; //we just want to timestamp this as created and not give it an ordering value
- }
float orderingValue = key.getOrderingValue();
orderingValue = computeNextOrderingValue(currentClock, lastAccess,
orderingValue);
@@ -128,7 +129,7 @@
break;
}
}
- this.maxInterval = i-1;
+ this.maxInterval = 1<<(i-1);
}
}
Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java 2011-10-19 15:42:21 UTC (rev 3570)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBufferFrontedFileStoreCache.java 2011-10-20 01:51:50 UTC (rev 3571)
@@ -90,6 +90,16 @@
ce = get(cache, 3l, s);
assertEquals(cacheObject, ce.getObject());
+ //repeat the test to ensure proper cleanup
+ ce = new CacheEntry(4l);
+ cacheObject = Integer.valueOf(60000);
+ ce.setObject(cacheObject);
+ cache.addToCacheGroup(s.getId(), ce.getId());
+ cache.add(ce, s);
+
+ ce = get(cache, 4l, s);
+ assertEquals(cacheObject, ce.getObject());
+
cache.removeCacheGroup(1l);
assertEquals(0, cache.getDataBlocksInUse());
Modified: trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java
===================================================================
--- trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java 2011-10-19 15:42:21 UTC (rev 3570)
+++ trunk/runtime/src/test/java/org/teiid/dqp/service/buffer/TestLocalBufferService.java 2011-10-20 01:51:50 UTC (rev 3571)
@@ -94,8 +94,8 @@
svc.start();
BufferManager mgr = svc.getBufferManager();
- assertEquals(6570, mgr.getSchemaSize(schema));
- assertEquals(256, mgr.getProcessorBatchSize(schema));
+ assertEquals(3285, mgr.getSchemaSize(schema));
+ assertEquals(128, mgr.getProcessorBatchSize(schema));
}
}
Modified: trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java
===================================================================
--- trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java 2011-10-19 15:42:21 UTC (rev 3570)
+++ trunk/test-integration/common/src/test/java/org/teiid/jdbc/FakeServer.java 2011-10-20 01:51:50 UTC (rev 3571)
@@ -130,6 +130,7 @@
BufferServiceImpl bsi = new BufferServiceImpl();
bsi.setDiskDirectory(UnitTestUtil.getTestScratchPath());
this.dqp.setBufferService(bsi);
+ bsi.start();
}
this.dqp.setCacheFactory(new DefaultCacheFactory());
13 years, 2 months
teiid SVN: r3570 - trunk/client/src/main/java/org/teiid/client.
by teiid-commits@lists.jboss.org
Author: shawkins
Date: 2011-10-19 11:42:21 -0400 (Wed, 19 Oct 2011)
New Revision: 3570
Modified:
trunk/client/src/main/java/org/teiid/client/BatchSerializer.java
Log:
TEIID-1750 pushing type codes down to datatypemanager
Modified: trunk/client/src/main/java/org/teiid/client/BatchSerializer.java
===================================================================
--- trunk/client/src/main/java/org/teiid/client/BatchSerializer.java 2011-10-19 14:17:27 UTC (rev 3569)
+++ trunk/client/src/main/java/org/teiid/client/BatchSerializer.java 2011-10-19 15:42:21 UTC (rev 3570)
@@ -93,13 +93,13 @@
throws IOException {
int code = DataTypeManager.getTypeCode(obj.getClass());
out.writeByte((byte)code);
- if (code == DataTypeManager.BOOLEAN) {
+ if (code == DataTypeManager.DefaultTypeCodes.BOOLEAN) {
if (Boolean.TRUE.equals(obj)) {
out.write((byte)1);
} else {
out.write((byte)0);
}
- } else if (code != DataTypeManager.OBJECT) {
+ } else if (code != DataTypeManager.DefaultTypeCodes.OBJECT) {
ColumnSerializer s = getSerializer(DataTypeManager.getDataTypeName(obj.getClass()), (byte)1);
s.writeObject(out, obj);
} else {
@@ -111,13 +111,13 @@
protected Object readObject(ObjectInput in) throws IOException,
ClassNotFoundException {
int code = in.readByte();
- if (code == DataTypeManager.BOOLEAN) {
+ if (code == DataTypeManager.DefaultTypeCodes.BOOLEAN) {
if (in.readByte() == (byte)0) {
return Boolean.FALSE;
}
return Boolean.TRUE;
}
- if (code != DataTypeManager.OBJECT) {
+ if (code != DataTypeManager.DefaultTypeCodes.OBJECT) {
ColumnSerializer s = getSerializer(DataTypeManager.getDataTypeName(DataTypeManager.getClass(code)), (byte)1);
return s.readObject(in);
}
13 years, 2 months