Author: shawkins
Date: 2011-09-27 23:15:08 -0400 (Tue, 27 Sep 2011)
New Revision: 3504
Added:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockBitSetTree.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockClosedLongIntHashTable.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockBitSetTree.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockClosedLongIntHashTable.java
Removed:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BitSetTree.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBitSetTree.java
Modified:
trunk/common-core/src/test/java/org/teiid/core/util/UnitTestUtil.java
trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java
trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStoreCache.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java
trunk/engine/src/test/java/org/teiid/common/buffer/BufferManagerFactory.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStoreCache.java
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSizeUtility.java
trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java
Log:
TEIID-1750 converting storage over to a more flexible inode approach
Modified: trunk/common-core/src/test/java/org/teiid/core/util/UnitTestUtil.java
===================================================================
--- trunk/common-core/src/test/java/org/teiid/core/util/UnitTestUtil.java 2011-09-23
04:11:11 UTC (rev 3503)
+++ trunk/common-core/src/test/java/org/teiid/core/util/UnitTestUtil.java 2011-09-28
03:15:08 UTC (rev 3504)
@@ -28,7 +28,16 @@
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import java.io.PrintWriter;
import java.io.Serializable;
+import java.io.Writer;
+import java.sql.Timestamp;
+import java.util.logging.ConsoleHandler;
+import java.util.logging.Formatter;
+import java.util.logging.Handler;
+import java.util.logging.Level;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
import junit.framework.Assert;
import junit.framework.AssertionFailedError;
@@ -388,6 +397,7 @@
return filePath;
}
+ @SuppressWarnings("unchecked")
public static final <T extends Serializable> T helpSerialize(T object) throws
IOException, ClassNotFoundException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
@@ -398,5 +408,59 @@
return (T)ois.readObject();
}
+
+ public static void enableTraceLogging(String loggerName) {
+ Logger logger = Logger.getLogger(loggerName);
+ logger.setLevel(Level.FINEST);
+ if (logger.getHandlers().length > 0) {
+ for (Handler h : logger.getHandlers()) {
+ h.setLevel(Level.FINEST);
+ }
+ } else {
+ logger.setUseParentHandlers(false);
+ ConsoleHandler ch = new ConsoleHandler();
+ ch.setFormatter(new Formatter() {
+
+ @Override
+ public String format(LogRecord record) {
+ final StringBuilder result = new StringBuilder();
+ result.append(new Timestamp(record.getMillis()));
+ result.append(" "); //$NON-NLS-1$
+ result.append(record.getLoggerName());
+ result.append(" "); //$NON-NLS-1$
+ result.append(record.getLevel());
+ result.append(" "); //$NON-NLS-1$
+ result.append(record.getThreadID());
+ result.append(" "); //$NON-NLS-1$
+ result.append(record.getMessage());
+ result.append('\n');
+ if (record.getThrown() != null) {
+ record.getThrown().printStackTrace(new PrintWriter(new Writer() {
+ @Override
+ public void close() throws IOException {
+
+ }
+
+ @Override
+ public void flush() throws IOException {
+
+ }
+
+ @Override
+ public void write(char[] cbuf, int off, int len)
+ throws IOException {
+ result.append(new String(cbuf, off, len));
+ }
+ }));
+ result.append('\n');
+ }
+ return result.toString();
+ }
+ });
+ ch.setLevel(Level.FINEST);
+ logger.addHandler(ch);
+ }
+ }
+
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java 2011-09-23 04:11:11 UTC
(rev 3503)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/Cache.java 2011-09-28 03:15:08 UTC
(rev 3504)
@@ -30,9 +30,9 @@
* Represents the storage strategy for the {@link BufferManager}
*/
public interface Cache extends StorageManager {
- void createCacheGroup(Long gid);
+ void createCacheGroup(Long gid); //called prior to adding an entry
Collection<Long> removeCacheGroup(Long gid);
- void addToCacheGroup(Long gid, Long oid); //called prior to adding an entry
+ void addToCacheGroup(Long gid, Long oid);
CacheEntry get(Long id, Serializer<?> serializer) throws TeiidComponentException;
void add(CacheEntry entry, Serializer<?> s);
void remove(Long gid, Long id);
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java 2011-09-23 04:11:11
UTC (rev 3503)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/FileStore.java 2011-09-28 03:15:08
UTC (rev 3504)
@@ -25,6 +25,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -223,6 +224,28 @@
};
}
+ public ByteBuffer getBuffer(long start, int length, boolean allocate) throws
IOException {
+ byte[] b = new byte[length];
+ if (!allocate) {
+ readFully(start, b, 0, length);
+ }
+ return ByteBuffer.wrap(b);
+ }
+
+ public void updateFromBuffer(ByteBuffer bb, long start) throws IOException {
+ byte[] b = null;
+ int offset = 0;
+ bb.rewind();
+ if (bb.hasArray()) {
+ b = bb.array();
+ offset = bb.arrayOffset();
+ } else {
+ b = new byte[bb.limit()];
+ bb.get(b);
+ }
+ write(start, b, offset, bb.limit());
+ }
+
public InputStream createInputStream(final long start) {
return createInputStream(start, -1);
}
@@ -242,7 +265,7 @@
};
}
- public FileStoreOutputStream createOutputStream(int maxMemorySize) {
+ public FileStoreOutputStream createOutputStream(int maxMemorySize) {
return new FileStoreOutputStream(maxMemorySize);
}
Deleted: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BitSetTree.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BitSetTree.java 2011-09-23
04:11:11 UTC (rev 3503)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BitSetTree.java 2011-09-28
03:15:08 UTC (rev 3504)
@@ -1,106 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
- package org.teiid.common.buffer.impl;
-
-import java.util.BitSet;
-
-/**
- * Extends a {@link BitSet} by adding a cumulative total and a
- * first level index to speed queries against large bitsets.
- */
-public class BitSetTree {
-
- public static final int MAX_INDEX = (1 << 24) - 1;
-
- private int bitsSet;
- private int totalBits;
- private short[] topVals = new short[1 << 9];
- private BitSet bitSet = new BitSet();
-
- public boolean get(int index) {
- if (index > MAX_INDEX) {
- throw new ArrayIndexOutOfBoundsException(index);
- }
- return bitSet.get(index);
- }
-
- /**
- * Set the given bit at the index. It's expected that the
- * bit currently has the opposite value.
- * @param bitIndex
- * @param value
- */
- public void set(int bitIndex, boolean value) {
- if (bitIndex > MAX_INDEX) {
- throw new ArrayIndexOutOfBoundsException(bitIndex);
- }
- bitSet.set(bitIndex, value);
- if (bitIndex >= totalBits) {
- totalBits = bitIndex + 1;
- }
- int topIndex = bitIndex >>> 15;
- int increment = value?1:-1;
- bitsSet+=increment;
- topVals[topIndex]+=increment;
- }
-
- public int getTotalBits() {
- return totalBits;
- }
-
- public int getBitsSet() {
- return bitsSet;
- }
-
- public int nextClearBit(int fromIndex) {
- int start = fromIndex >> 15;
- for (int i = start; i < topVals.length; i++) {
- if (topVals[i] < Short.MAX_VALUE) {
- int searchFrom = fromIndex;
- if (i > start) {
- searchFrom = i << 15;
- }
- return bitSet.nextClearBit(searchFrom);
- }
- }
- return -1;
- }
-
- public int nextSetBit(int fromIndex) {
- if (bitsSet == 0) {
- return -1;
- }
- int start = fromIndex >> 15;
- for (int i = fromIndex >> 15; i < topVals.length; i++) {
- if (topVals[i] > 0) {
- int searchFrom = fromIndex;
- if (i > start) {
- searchFrom = i << 15;
- }
- return bitSet.nextSetBit(searchFrom);
- }
- }
- return -1;
- }
-
-}
Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockBitSetTree.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockBitSetTree.java
(rev 0)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockBitSetTree.java 2011-09-28
03:15:08 UTC (rev 3504)
@@ -0,0 +1,191 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+ package org.teiid.common.buffer.impl;
+
+import java.util.BitSet;
+
+import org.teiid.common.buffer.impl.FileStoreCache.BlockInfo;
+
+/**
+ * Extends a {@link BitSet} by adding a cumulative total and a
+ * first level index to speed queries against large bitsets.
+ */
+public class BlockBitSetTree {
+
+ private static final int LOG_BITS_PER_BLOCK = FileStoreCache.LOG_BLOCK_SIZE + 3;
+ private static final int MAX_TOP_VALUE = 1 << LOG_BITS_PER_BLOCK;
+ private int maxIndex;
+ private int bitsSet;
+ private int[] topVals;
+ private BlockManager blockManager;
+ private int blockCount = 0;
+
+ public BlockBitSetTree(int maxIndex, BlockManager blockManager) {
+ this.maxIndex = maxIndex;
+ this.blockManager = blockManager;
+ this.topVals = new int[(maxIndex >> (FileStoreCache.LOG_BLOCK_SIZE + 3)) + 1];
+ }
+
+ public int getMaxIndex() {
+ return maxIndex;
+ }
+
+ /**
+ * Set the given bit at the index.
+ * @param bitIndex
+ * @param value
+ */
+ public synchronized void set(int bitIndex, boolean value) {
+ getOrSet(bitIndex, value, true);
+ }
+
+ public synchronized boolean get(int bitIndex) {
+ return getOrSet(bitIndex, false, false);
+ }
+
+ private boolean getOrSet(int bitIndex, boolean value, boolean update) {
+ if (bitIndex > maxIndex) {
+ throw new ArrayIndexOutOfBoundsException(bitIndex);
+ }
+ int blockIndex = bitIndex>>LOG_BITS_PER_BLOCK;
+ BlockInfo bb = null;
+ if (blockIndex >= blockCount) {
+ if (!update) {
+ return false;
+ }
+ for (; blockCount < blockIndex+1; blockCount++) {
+ bb = blockManager.allocateBlock(blockCount);
+ bb.buf.position(0);
+ int longsPerBlock = FileStoreCache.BLOCK_SIZE >> 6;
+ for (int j = 0; j < longsPerBlock; j++) {
+ bb.buf.putLong(0);
+ }
+ }
+ } else {
+ bb = blockManager.getBlock(blockIndex);
+ }
+ int relativeIndex = bitIndex&(MAX_TOP_VALUE-1);
+ int longByteIndex = (relativeIndex>>6)<<3;
+ long word = bb.buf.getLong(longByteIndex);
+ long mask = 1L << bitIndex;
+ boolean currentValue = ((word & mask) != 0);
+ if (!update) {
+ return currentValue;
+ }
+ if (currentValue == value) {
+ return currentValue;
+ }
+ if (value) {
+ word |= mask;
+ } else {
+ word &= ~mask;
+ }
+ bb.buf.putLong(longByteIndex, word);
+ blockManager.updateBlock(bb);
+ int topIndex = bitIndex >> LOG_BITS_PER_BLOCK;
+ int increment = value?1:-1;
+ bitsSet+=increment;
+ topVals[topIndex]+=increment;
+ return currentValue;
+ }
+
+ public synchronized int getBitsSet() {
+ return bitsSet;
+ }
+
+ public synchronized int nextClearBit(int fromIndex) {
+ int start = fromIndex >> LOG_BITS_PER_BLOCK;
+ for (int i = start; i < topVals.length; i++) {
+ if (topVals[i] == MAX_TOP_VALUE) {
+ continue;
+ }
+ if (topVals[i] == 0) {
+ if (i == start) {
+ return fromIndex;
+ }
+ return i * MAX_TOP_VALUE;
+ }
+ int relativeIndex = 0;
+ if (i == start) {
+ relativeIndex = fromIndex&(MAX_TOP_VALUE-1);
+ }
+ BlockInfo bb = blockManager.getBlock(i);
+
+ int longByteIndex = (relativeIndex>>6)<<3;
+
+ long word = ~bb.buf.getLong(longByteIndex) & (-1l << relativeIndex);
+
+ while (true) {
+ if (word != 0) {
+ return longByteIndex*8 + (i * MAX_TOP_VALUE) + Long.numberOfTrailingZeros(word);
+ }
+ longByteIndex+=8;
+ if (longByteIndex > FileStoreCache.BLOCK_MASK) {
+ break;
+ }
+ word = ~bb.buf.getLong(longByteIndex);
+ }
+ }
+ return -1;
+ }
+
+ public synchronized int nextSetBit(int fromIndex) {
+ if (bitsSet == 0) {
+ return -1;
+ }
+ int start = fromIndex >> LOG_BITS_PER_BLOCK;
+ for (int i = start; i < topVals.length; i++) {
+ if (topVals[i] == 0) {
+ continue;
+ }
+ if (topVals[i] == MAX_TOP_VALUE) {
+ if (i == start) {
+ return fromIndex;
+ }
+ return i * MAX_TOP_VALUE;
+ }
+ int relativeIndex = 0;
+ if (i == start) {
+ relativeIndex = fromIndex&(MAX_TOP_VALUE-1);
+ }
+ BlockInfo bb = blockManager.getBlock(i);
+
+ int longByteIndex = (relativeIndex>>6)<<3;
+
+ long word = bb.buf.getLong(longByteIndex) & (-1l << relativeIndex);
+
+ while (true) {
+ if (word != 0) {
+ return longByteIndex*8 + (i * MAX_TOP_VALUE) + Long.numberOfTrailingZeros(word);
+ }
+ longByteIndex+=8;
+ if (longByteIndex > FileStoreCache.BLOCK_MASK) {
+ break;
+ }
+ word = bb.buf.getLong(longByteIndex);
+ }
+ }
+ return -1;
+ }
+
+}
Property changes on:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockBitSetTree.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockClosedLongIntHashTable.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockClosedLongIntHashTable.java
(rev 0)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockClosedLongIntHashTable.java 2011-09-28
03:15:08 UTC (rev 3504)
@@ -0,0 +1,238 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.teiid.common.buffer.impl.FileStoreCache.BlockInfo;
+
+/**
+ * Represents the logical structure of a cache group / directory
+ *
+ * Implemented by a closed hash table using a single step linear probe with delayed
removal.
+ * Uses power of 2 hashing.
+ *
+ * Provides an extremely simple hash structure that rivals {@link HashMap} performance
and
+ * is directly mapped to {@link ByteBuffer}s to avoid serialization overhead.
+ *
+ * Does not expect keys or values to be negative.
+ */
+public class BlockClosedLongIntHashTable {
+
+ private enum Mode {
+ GET,
+ UPDATE,
+ REMOVE
+ }
+
+ private static final int BYTES_PER_ROW = 12; //8+4
+ private static final int BLOCK_SIZE = FileStoreCache.BLOCK_DATA_BYTES/BYTES_PER_ROW;
+ private static final float LOAD_FACTOR = .7f;
+ private static final int MIN_SIZE = 1 << (31 -
Integer.numberOfLeadingZeros(BLOCK_SIZE)); //should fit in a single block
+
+ static final int EMPTY = -1;
+ private static final int REMOVED = -2;
+
+ protected int size;
+ protected int capacityMask = EMPTY;
+ protected BlockManager blockManager;
+
+ public BlockClosedLongIntHashTable(BlockManager blockManager) {
+ this.blockManager = blockManager;
+ }
+
+ private void init(int capacity) {
+ int currentBlockCount = blockCount(capacityMask + 1);
+ int desiredBlockCount = blockCount(capacity);
+ if (capacity > capacityMask) {
+ for (int i = currentBlockCount; i < desiredBlockCount; i++) {
+ BlockInfo bb = blockManager.allocateBlock(i);
+ empty(bb.buf, BLOCK_SIZE);
+ blockManager.updateBlock(bb);
+ }
+ }
+ capacityMask = capacity - 1;
+ }
+
+ private void empty(ByteBuffer bb, int toIndex) {
+ bb.position(0);
+ for (int j = 0; j < toIndex; j++) {
+ bb.putLong(j * BYTES_PER_ROW, EMPTY);
+ }
+ }
+
+ private static int blockCount(int size) {
+ int currentBlockCount = (size) / BLOCK_SIZE;
+ if (size%BLOCK_SIZE > 0) {
+ currentBlockCount++;
+ }
+ return currentBlockCount;
+ }
+
+ public synchronized int put(long key, int value) {
+ int result = getOrUpdate(key, value, Mode.UPDATE);
+ if ((result == EMPTY || result == REMOVED) && ++size >
LOAD_FACTOR*capacityMask) {
+ int newCapacity = (capacityMask+1)<<1;
+ int oldLength = capacityMask + 1;
+ init(newCapacity);
+ rehash(oldLength, newCapacity);
+ }
+ return result;
+ }
+
+ public synchronized int get(long key) {
+ return getOrUpdate(key, EMPTY, Mode.GET);
+ }
+
+ public synchronized int remove(long key) {
+ int result = getOrUpdate(key, EMPTY, Mode.REMOVE);
+ if (result != EMPTY && --size*LOAD_FACTOR < capacityMask>>3 &&
(capacityMask+1)>>1 >= MIN_SIZE) {
+ //reduce the size of the table by half
+ int oldLength = capacityMask + 1;
+ capacityMask >>= 1;
+ rehash(oldLength, oldLength>>1);
+ int oldBlocks = blockCount(oldLength);
+ int newBlocks = blockCount(capacityMask +1);
+ for (int i = oldBlocks-1; i >= newBlocks; i--) {
+ blockManager.freeBlock(i);
+ }
+ }
+ return result;
+ }
+
+ private void rehash(int oldLength, int newLength) {
+ BlockInfo lastBlockInfo = null;
+ ByteBuffer lastBlock = null;
+ for (int i = 0; i < oldLength; i++) {
+ int relativeIndex = i%BLOCK_SIZE;
+ if (lastBlock == null || relativeIndex == 0) {
+ int lastIndex = i/BLOCK_SIZE;
+ lastBlockInfo = blockManager.getBlock(lastIndex);
+ lastBlock = lastBlockInfo.buf;
+ if (i < newLength) {
+ byte[] buf = new byte[FileStoreCache.BLOCK_DATA_BYTES];
+ lastBlock.position(0);
+ lastBlock.get(buf);
+ ByteBuffer copyBlock = ByteBuffer.wrap(buf);
+ empty(lastBlock, Math.min(BLOCK_SIZE, oldLength - lastIndex*BLOCK_SIZE));
+ blockManager.updateBlock(lastBlockInfo);
+ lastBlock = copyBlock;
+ }
+ }
+ lastBlock.position(relativeIndex*BYTES_PER_ROW);
+ long oldKey = lastBlock.getLong();
+ int oldValue = lastBlock.getInt();
+ if (oldKey != REMOVED && oldKey != EMPTY) {
+ getOrUpdate(oldKey, oldValue, Mode.UPDATE);
+ }
+ }
+ }
+
+ @SuppressWarnings("null")
+ protected int getOrUpdate(long key, int value, Mode mode) {
+ if (capacityMask == EMPTY) {
+ if (mode == Mode.GET || mode == Mode.REMOVE) {
+ return EMPTY;
+ }
+ init(MIN_SIZE);
+ }
+ int i = hashIndex(key);
+ BlockInfo lastBlockInfo = null;
+ long old = EMPTY;
+ int position = 0;
+ while (true) {
+ int relativeIndex = i%BLOCK_SIZE;
+ if (lastBlockInfo == null || relativeIndex == 0) {
+ int index = i/BLOCK_SIZE;
+ lastBlockInfo = blockManager.getBlock(index);
+ }
+ position = relativeIndex*BYTES_PER_ROW;
+ old = lastBlockInfo.buf.getLong(position);
+ if (old == EMPTY || old == key || (mode == Mode.UPDATE && old == REMOVED)) {
+ break;
+ }
+ i = (i + 1) & capacityMask;
+ }
+ int result = EMPTY;
+ if (old != EMPTY && old != REMOVED) {
+ result = lastBlockInfo.buf.getInt(position + 8);
+ }
+ switch (mode) {
+ case GET:
+ return result;
+ case UPDATE:
+ lastBlockInfo.buf.putLong(position, key);
+ lastBlockInfo.buf.putInt(position + 8, value);
+ blockManager.updateBlock(lastBlockInfo);
+ return result;
+ case REMOVE:
+ if (old == EMPTY || old == REMOVED) {
+ return EMPTY;
+ }
+ lastBlockInfo.buf.putLong(position, REMOVED);
+ blockManager.updateBlock(lastBlockInfo);
+ return result;
+ default:
+ throw new AssertionError();
+ }
+ }
+
+ private int hashIndex(long key) {
+ //start with the usual long hash
+ int primaryHash = (int)(key ^ (key >>> 32));
+ //allow the lower bits to spread the entries
+ primaryHash += primaryHash <<= 2;
+ primaryHash += primaryHash <<= 3;
+ return primaryHash & capacityMask;
+ }
+
+ public synchronized int size() {
+ return size;
+ }
+
+ public synchronized Map<Long, Integer> remove() {
+ Map<Long, Integer> result = new HashMap<Long, Integer>();
+ BlockInfo lastBlockInfo = null;
+ int blockIndex = 0;
+ for (int i = capacityMask; i >= 0; i--) {
+ int relativeIndex = i%BLOCK_SIZE;
+ if (lastBlockInfo == null || relativeIndex == BLOCK_SIZE - 1) {
+ if (lastBlockInfo != null) {
+ blockManager.freeBlock(blockIndex);
+ }
+ blockIndex = i/BLOCK_SIZE;
+ lastBlockInfo = blockManager.getBlock(blockIndex);
+ }
+ lastBlockInfo.buf.position(relativeIndex*BYTES_PER_ROW);
+ long key = lastBlockInfo.buf.getLong();
+ if (key != EMPTY && key != REMOVED) {
+ result.put(key, lastBlockInfo.buf.getInt());
+ }
+ }
+ blockManager.free();
+ return result;
+ }
+
+}
Property changes on:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockClosedLongIntHashTable.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java
(rev 0)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java 2011-09-28
03:15:08 UTC (rev 3504)
@@ -0,0 +1,51 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import org.teiid.common.buffer.impl.FileStoreCache.BlockInfo;
+
+/**
+ * Represents an INode
+ *
+ * Returned BlockInfo may be shared. If shared there and no guarantees about position
and mark.
+ * in particular system/index blocks can be used by multiple threads relative methods
should be
+ * avoided, but may be used for exclusive write operations.
+ * Otherwise the position will be 0.
+ *
+ * Due to buffermanager locking, non-index data blocks can be assumed to be thread-safe.
+ */
+public interface BlockManager {
+
+ int getInode();
+
+ BlockInfo allocateBlock(int index);
+
+ BlockInfo getBlock(int index);
+
+ void updateBlock(BlockInfo block);
+
+ void freeBlock(int index);
+
+ void free();
+
+}
Property changes on:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BlockManager.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-09-23
04:11:11 UTC (rev 3503)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-09-28
03:15:08 UTC (rev 3504)
@@ -31,6 +31,7 @@
import java.lang.ref.WeakReference;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@@ -250,6 +251,7 @@
private boolean useWeakReferences = true;
private boolean inlineLobs = true;
private int targetBytesPerRow = TARGET_BYTES_PER_ROW;
+ private int maxSoftReferences;
private ReentrantLock lock = new ReentrantLock(true);
private Condition batchesFreed = lock.newCondition();
@@ -260,10 +262,23 @@
private LinkedHashMap<Long, CacheEntry> memoryEntries = new
LinkedHashMap<Long, CacheEntry>(16, .75f, false);
private LinkedHashMap<Long, CacheEntry> tenuredMemoryEntries = new
LinkedHashMap<Long, CacheEntry>(16, .75f, true);
- private WeakReferenceHashedValueCache<CacheEntry> weakReferenceCache; //limited
size based upon the memory settings
- private ConcurrentHashMap<Long, BatchSoftReference> softCache = new
ConcurrentHashMap<Long, BatchSoftReference>(); //"unlimitted" size
maintained by reference queue
+ //limited size reference caches based upon the memory settings
+ private WeakReferenceHashedValueCache<CacheEntry> weakReferenceCache;
+ private Map<Long, BatchSoftReference> softCache =
Collections.synchronizedMap(new LinkedHashMap<Long, BatchSoftReference>(16, .75f,
false) {
+ private static final long serialVersionUID = 1L;
+
+ protected boolean removeEldestEntry(Map.Entry<Long,BatchSoftReference> eldest) {
+ if (size() > maxSoftReferences) {
+ BatchSoftReference bsr = eldest.getValue();
+ maxReserveKB.addAndGet(bsr.sizeEstimate);
+ bsr.clear();
+ return true;
+ }
+ return false;
+ };
+ });
- private Cache cache;
+ Cache cache;
private Map<String, TupleReference> tupleBufferMap = new
ConcurrentHashMap<String, TupleReference>();
private ReferenceQueue<TupleBuffer> tupleBufferQueue = new
ReferenceQueue<TupleBuffer>();
@@ -430,11 +445,12 @@
if (this.maxProcessingKBOrig < 0) {
this.maxProcessingKB = Math.max(Math.min(8 * processorBatchSize, Integer.MAX_VALUE),
(int)(.1 * maxMemory)/maxActivePlans);
}
+ int memoryBatches = (this.maxProcessingKB * maxActivePlans + this.getMaxReserveKB()) /
(processorBatchSize * targetBytesPerRow / 1024);
+ int logSize = 39 - Integer.numberOfLeadingZeros(memoryBatches);
if (useWeakReferences) {
- int memoryBatches = (this.maxProcessingKB * maxActivePlans + this.getMaxReserveKB()) /
(processorBatchSize * targetBytesPerRow / 1024);
- int logSize = 39 - Integer.numberOfLeadingZeros(memoryBatches);
weakReferenceCache = new WeakReferenceHashedValueCache<CacheEntry>(Math.min(20,
logSize));
}
+ this.maxSoftReferences = 1 << Math.max(28, logSize+2);
}
@Override
@@ -551,7 +567,7 @@
}
private void createSoftReference(CacheEntry ce) {
- BatchSoftReference ref = new BatchSoftReference(ce, SOFT_QUEUE, ce.getSizeEstimate());
+ BatchSoftReference ref = new BatchSoftReference(ce, SOFT_QUEUE,
ce.getSizeEstimate()/2);
softCache.put(ce.getId(), ref);
maxReserveKB.addAndGet(- ce.getSizeEstimate()/2);
}
@@ -584,12 +600,11 @@
return ce;
}
if (prefersMemory) {
- BatchSoftReference bsr = softCache.get(batch);
+ BatchSoftReference bsr = softCache.remove(batch);
if (bsr != null) {
ce = bsr.get();
if (ce != null) {
- softCache.remove(batch);
- maxReserveKB.addAndGet(bsr.sizeEstimate/2);
+ maxReserveKB.addAndGet(bsr.sizeEstimate);
}
}
} else if (useWeakReferences) {
@@ -611,6 +626,9 @@
}
void remove(Long gid, Long batch, boolean prefersMemory) {
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR, MessageLevel.TRACE))
{
+ LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, "Removing batch from
BufferManager", batch); //$NON-NLS-1$
+ }
CacheEntry ce = fastGet(batch, prefersMemory, false);
if (ce == null) {
cache.remove(gid, batch);
@@ -661,8 +679,7 @@
break;
}
softCache.remove(ref.key);
- maxReserveKB.addAndGet(ref.sizeEstimate/2);
- ref.sizeEstimate = 0;
+ maxReserveKB.addAndGet(ref.sizeEstimate);
ref.clear();
}
}
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java 2011-09-23
04:11:11 UTC (rev 3503)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/ExtensibleBufferedOutputStream.java 2011-09-28
03:15:08 UTC (rev 3504)
@@ -24,32 +24,37 @@
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
public abstract class ExtensibleBufferedOutputStream extends OutputStream {
- protected int bytesWritten;
- protected byte buf[];
- protected int count;
+ protected ByteBuffer buf;
- public ExtensibleBufferedOutputStream(byte[] buf) {
- this.buf = buf;
+ public ExtensibleBufferedOutputStream() {
}
public void write(int b) throws IOException {
- if (count >= buf.length) {
+ ensureBuffer();
+ if (buf.remaining() == 0) {
flush();
}
- buf[count++] = (byte)b;
+ buf.put((byte)b);
}
+ private void ensureBuffer() {
+ if (buf == null) {
+ buf = newBuffer();
+ }
+ }
+
public void write(byte b[], int off, int len) throws IOException {
while (true) {
- int toCopy = Math.min(buf.length - count, len);
- System.arraycopy(b, off, buf, count, toCopy);
- count += toCopy;
+ ensureBuffer();
+ int toCopy = Math.min(buf.remaining(), len);
+ buf.put(b, off, toCopy);
len -= toCopy;
off += toCopy;
- if (count < buf.length) {
+ if (buf.remaining() > 0) {
break;
}
flush();
@@ -57,12 +62,13 @@
}
public void flush() throws IOException {
- if (count > 0) {
+ if (buf != null && buf.position() > 0) {
flushDirect();
}
- bytesWritten += count;
- count = 0;
+ buf = null;
}
+
+ protected abstract ByteBuffer newBuffer();
protected abstract void flushDirect() throws IOException;
@@ -71,8 +77,4 @@
flush();
}
- public int getBytesWritten() {
- return bytesWritten;
- }
-
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java 2011-09-23
04:11:11 UTC (rev 3503)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStorageManager.java 2011-09-28
03:15:08 UTC (rev 3504)
@@ -151,6 +151,9 @@
@Override
public synchronized void setLength(long length) throws IOException {
+ if (fileInfo == null) {
+ fileInfo = new FileInfo(createFile(name));
+ }
try {
fileInfo.open().setLength(length);
} finally {
@@ -165,7 +168,7 @@
fileInfo.delete();
}
}
-
+
}
// Initialization
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStoreCache.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStoreCache.java 2011-09-23
04:11:11 UTC (rev 3503)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/FileStoreCache.java 2011-09-28
03:15:08 UTC (rev 3504)
@@ -27,15 +27,12 @@
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
import org.teiid.common.buffer.Cache;
import org.teiid.common.buffer.CacheEntry;
@@ -50,298 +47,604 @@
import org.teiid.query.QueryPlugin;
/**
- * Implements storage and caching against a {@link FileStore} abstraction.
- * caching uses a block paradigm, where the first block has the format
- * long gid | long oid | short tail bytes | short sector table length | [3 byte int
sector ...]
- *
- * TODO: back non-fragmented block information onto the block address
+ * Implements storage against a {@link FileStore} abstraction using a filesystem
paradigm.
+ * The filesystem uses a 31bit address space on top of 2^14 byte blocks.
+ *
+ * Therefore there is 2^31*2^14 = 2^45 or 32 terabytes max of addressable space.
+ *
+ * Some amount of the space is taken up by system information (inodes and use flags).
+ * This is held in a separate file.
+ *
+ * The 64 byte inode format is:
+ * 14 32 bit direct block pointers
+ * 1 32 bit block indirect pointer
+ * 1 32 bit block doubly indirect pointer
+ *
+ * The data block format is:
+ * data bytes | long gid | long oid | three byte int block num
+ *
+ * The gid/oid are stored with the block so that defrag/compaction can be performed
+ * with minimal blocking/lookups.
+ *
+ * This means that the maximum number of blocks available to a "file" is
+ * 14 + (2^14-18)/4 + ((2^14-18)/4)^2 ~= 2^24
+ *
+ * Thus the max serialized object size is: 2^24*(2^14-18) ~= 256GB.
+ *
+ * The root directory "physicalMapping" is held in memory for performance,
+ * but could itself be switched to using a block map. It will grow in
+ * proportion to the number of tables/tuplebuffers in use.
+ *
+ * TODO: defragment
+ * TODO: lobs could also be supported in this structure.
*/
public class FileStoreCache implements Cache, StorageManager {
- private static final int IO_BUFFER_SIZE = 1 << 14;
- private static final int DATA_START = 1 << 13;
- private static final int MAX_BLOCKS = 2724; //implies that the max size is ~ 42.5 mb
+ //TODO allow the block size to be configurable
+ static final int LOG_BLOCK_SIZE = 14;
+ static final int ADDRESS_BITS = 31;
+ static final int SYSTEM_MASK = 1<<ADDRESS_BITS;
+ static final int BLOCK_SIZE = 1 << LOG_BLOCK_SIZE;
+ static final int BLOCK_MASK = BLOCK_SIZE - 1;
+ static final int BLOCK_OVERHEAD = 8+8+3;
+ static final int BLOCK_DATA_BYTES = BLOCK_SIZE - BLOCK_OVERHEAD;
+ static final int BYTES_PER_BLOCK_ADDRESS = 4;
+ static final int ADDRESSES_PER_BLOCK = BLOCK_DATA_BYTES/BYTES_PER_BLOCK_ADDRESS;
+ static final int INODE_BYTES = 16*BYTES_PER_BLOCK_ADDRESS;
+ static final int DIRECT_POINTERS = 14;
+ static final int MAX_INDIRECT = DIRECT_POINTERS + ADDRESSES_PER_BLOCK;
+ static final int MAX_DOUBLE_INDIRECT = MAX_INDIRECT + ADDRESSES_PER_BLOCK *
ADDRESSES_PER_BLOCK;
+ static final int EMPTY_ADDRESS = -1;
+
+ private final class BlockOutputStream extends
+ ExtensibleBufferedOutputStream {
+ private final BlockManager blockManager;
+ int blockNum = -1;
+ BlockInfo bi;
+
+ private BlockOutputStream(BlockManager blockManager) {
+ this.blockManager = blockManager;
+ }
+
+ @Override
+ protected ByteBuffer newBuffer() {
+ bi = blockManager.allocateBlock(++blockNum);
+ return bi.buf;
+ }
+
+ @Override
+ protected void flushDirect() throws IOException {
+ blockManager.updateBlock(bi);
+ bi = null;
+ }
+ }
+
+ private final class BitSetBlockManager implements BlockManager {
+ final int offset;
+
+ BitSetBlockManager(int offset) {
+ this.offset = offset;
+ }
+
+ @Override
+ public void updateBlock(BlockInfo info) {
+ updatePhysicalBlock(info);
+ }
+
+ @Override
+ public BlockInfo getBlock(int index) {
+ return getPhysicalBlock(index + offset, true, false);
+ }
+
+ @Override
+ public BlockInfo allocateBlock(int index) {
+ return getPhysicalBlock(index + offset, true, true);
+ }
+
+ @Override
+ public int getInode() {
+ throw new AssertionError();
+ }
+
+ @Override
+ public void freeBlock(int index) {
+ throw new AssertionError();
+ }
+
+ @Override
+ public void free() {
+ throw new AssertionError();
+ }
+
+ }
- static class TryFreeParameter {
- Long gid;
- Long oid;
- Integer block;
+ private enum Mode {
+ GET,
+ UPDATE,
+ ALLOCATE
+ }
+
+ static final class BlockInfo {
+ final boolean system;
+ final int inodeOffset;
+ final ByteBuffer buf;
+ final int physicalAddress;
+ boolean dirty;
+
+ BlockInfo(boolean system, ByteBuffer ib, int index, int inodeOffset) {
+ this.system = system;
+ this.buf = ib;
+ this.physicalAddress = index;
+ this.inodeOffset = inodeOffset;
+ }
+ }
+
+ private final class InodeBlockManager implements BlockManager {
+ private final int inode;
+ private final long gid;
+ private final long oid;
+ private int lastBlock = -1;
+ boolean trackLast;
- public TryFreeParameter(Long gid, Long oid, Integer block) {
+ InodeBlockManager(long gid, long oid, int inode) {
+ if (inode == EMPTY_ADDRESS) {
+ synchronized (inodesInuse) {
+ inode = inodesInuse.nextClearBit(0);
+ if (inode == -1) {
+ throw new TeiidRuntimeException("no inodes available"); //$NON-NLS-1$
+ }
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE)) {
+ LogManager.logTrace(LogConstants.CTX_DQP, "Allocating inode", inode,
"to", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
+ }
+ inodesInuse.set(inode, true);
+ }
+ int inodeBlock = inode/inodesPerBlock;
+ int inodePosition = INODE_BYTES*(inode%inodesPerBlock);
+ BlockInfo bb = getInodeSubBlock(inodeBlock, inodePosition);
+ bb.buf.putInt(EMPTY_ADDRESS);
+ updatePhysicalBlock(bb);
+ }
+ this.inode = inode;
this.gid = gid;
this.oid = oid;
- this.block = block;
}
- }
-
- private class Segment {
- int id;
- ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- List<TryFreeParameter> freed = Collections.synchronizedList(new
ArrayList<TryFreeParameter>());
- FileStore store;
- BitSetTree inuse = new BitSetTree();
- BitSetTree fragmentedFlags = new BitSetTree();
- public Segment(int id) {
- this.id = id;
+ @Override
+ public int getInode() {
+ return inode;
}
-
- void tryFree(Long gid, Long oid, Integer block) {
- if (lock.writeLock().tryLock()) {
- try {
- free(gid, oid, block);
- } finally {
- lock.writeLock().unlock();
+
+ @Override
+ public void updateBlock(BlockInfo info) {
+ updatePhysicalBlock(info);
+ }
+
+ @Override
+ public BlockInfo getBlock(int index) {
+ int dataBlock = getOrUpdateDataBlockIndex(index, EMPTY_ADDRESS, Mode.GET);
+ BlockInfo bb = getPhysicalBlock(dataBlock, false, false);
+ bb.buf.position(0);
+ bb.buf.limit(BLOCK_DATA_BYTES);
+ return bb;
+ }
+
+ private int getOrUpdateDataBlockIndex(int index, int value, Mode mode) {
+ if (index >= MAX_DOUBLE_INDIRECT) {
+ throw new TeiidRuntimeException("Max block number exceeded");
//$NON-NLS-1$
+ }
+ int dataBlock = 0;
+ int position = 0;
+ int inodePosition = INODE_BYTES*(inode%inodesPerBlock);
+ BlockInfo info = getInodeSubBlock(inode/inodesPerBlock, inodePosition);
+ if (index >= MAX_INDIRECT) {
+ position = BYTES_PER_BLOCK_ADDRESS*(DIRECT_POINTERS+1);
+ BlockInfo next = updateIndirectBlockInfo(info, index, position, MAX_INDIRECT,
MAX_DOUBLE_INDIRECT+1, value, mode);
+ if (next != info) {
+ info = next;
+ //should have traversed to the secondary
+ int indirectAddressBlock = (index - MAX_INDIRECT) / ADDRESSES_PER_BLOCK;
+ position = indirectAddressBlock * BYTES_PER_BLOCK_ADDRESS;
+ if (mode == Mode.ALLOCATE && position + BYTES_PER_BLOCK_ADDRESS <
BLOCK_DATA_BYTES) {
+ info.buf.putInt(position + BYTES_PER_BLOCK_ADDRESS, EMPTY_ADDRESS);
+ }
+ next = updateIndirectBlockInfo(info, index, position, MAX_INDIRECT +
indirectAddressBlock * ADDRESSES_PER_BLOCK, MAX_DOUBLE_INDIRECT + 2 +
indirectAddressBlock, value, mode);
+ if (next != info) {
+ info = next;
+ position = ((index - MAX_INDIRECT)%ADDRESSES_PER_BLOCK) * BYTES_PER_BLOCK_ADDRESS;
+ if (mode == Mode.ALLOCATE && position + BYTES_PER_BLOCK_ADDRESS <
BLOCK_DATA_BYTES) {
+ info.buf.putInt(position + BYTES_PER_BLOCK_ADDRESS, EMPTY_ADDRESS);
+ }
+ }
}
+ } else if (index >= DIRECT_POINTERS) {
+ //indirect
+ position = BYTES_PER_BLOCK_ADDRESS*DIRECT_POINTERS;
+ BlockInfo next = updateIndirectBlockInfo(info, index, position, DIRECT_POINTERS,
MAX_DOUBLE_INDIRECT, value, mode);
+ if (next != info) {
+ info = next;
+ position = (index - DIRECT_POINTERS) * BYTES_PER_BLOCK_ADDRESS;
+ if (mode == Mode.ALLOCATE && position + BYTES_PER_BLOCK_ADDRESS <
BLOCK_DATA_BYTES) {
+ info.buf.putInt(position + BYTES_PER_BLOCK_ADDRESS, EMPTY_ADDRESS);
+ }
+ }
} else {
- freed.add(new TryFreeParameter(gid, oid, block));
+ position = BYTES_PER_BLOCK_ADDRESS*index;
+ if (mode == Mode.ALLOCATE) {
+ info.buf.putInt(position + BYTES_PER_BLOCK_ADDRESS, EMPTY_ADDRESS);
+ }
}
+ if (mode == Mode.ALLOCATE) {
+ dataBlock = nextBlock();
+ info.buf.putInt(position, dataBlock);
+ updatePhysicalBlock(info);
+ } else {
+ dataBlock = info.buf.getInt(position);
+ if (mode == Mode.UPDATE) {
+ info.buf.putInt(position, value);
+ updatePhysicalBlock(info);
+ }
+ }
+ return dataBlock;
}
- private void free(Long gid, Long oid, Integer block) {
- if (block == null) {
- Map<Long, Integer> map = physicalMapping.get(gid);
- if (map == null) {
- return;
+ private BlockInfo updateIndirectBlockInfo(BlockInfo info, int index, int position, int
cutOff, int blockId, int value, Mode mode) {
+ int sib_index = info.buf.getInt(position);
+ boolean newBlock = false;
+ if (index == cutOff) {
+ if (mode == Mode.ALLOCATE) {
+ sib_index = nextBlock();
+ info.buf.putInt(position, sib_index);
+ updatePhysicalBlock(info);
+ newBlock = true;
+ } else if (mode == Mode.UPDATE && value == EMPTY_ADDRESS) {
+ freeDataBlock(sib_index);
+ return info;
}
- block = map.remove(oid);
- if (block == null) {
- return;
+ }
+ info = getPhysicalBlock(sib_index, false, false);
+ if (newBlock) {
+ putBlockId(blockId, info.buf);
+ }
+ return info;
+ }
+
+ private int nextBlock() {
+ int next = -1;
+ synchronized (blocksInuse) {
+ next = blocksInuse.nextClearBit(lastBlock + 1);
+ if (next == -1) {
+ throw new TeiidRuntimeException("no freespace available"); //$NON-NLS-1$
}
+ blocksInuse.set(next, true);
}
- byte[] buf = new byte[DATA_START];
- try {
- inuse.set(block, false);
- boolean fragmented = false;
- if (fragmentedFlags.get(block)) {
- fragmented = true;
- store.read(block << 14, buf, 0, buf.length);
- fragmentedFlags.set(block, false);
- } else {
- //TODO: if not fragmented come up with a better approach than a disk read
- store.read(block << 14, buf, 0, 20);
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE)) {
+ LogManager.logTrace(LogConstants.CTX_DQP, "Allocating block", next,
"to", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
+ }
+ if (trackLast) {
+ lastBlock = next;
+ }
+ return next;
+ }
+
+ @Override
+ public void freeBlock(int index) {
+ int dataBlock = getOrUpdateDataBlockIndex(index, EMPTY_ADDRESS, Mode.UPDATE);
+ freeDataBlock(dataBlock);
+ }
+
+ private void freeDataBlock(int dataBlock) {
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE)) {
+ LogManager.logTrace(LogConstants.CTX_DQP, "freeing data block", dataBlock,
"for", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
+ }
+ blockCache.remove(dataBlock);
+ blocksInuse.set(dataBlock, false);
+ }
+
+ BlockInfo getInodeSubBlock(int inodeBlock, int inodePosition) {
+ BlockInfo bi = getPhysicalBlock(inodeBlock + blocksInUseBlocks + inodesInUseBlocks,
true, false);
+ ByteBuffer bb = bi.buf.duplicate();
+ bb.position(inodePosition);
+ bb.limit(inodePosition + INODE_BYTES);
+ bb = bb.slice();
+ return new BlockInfo(true, bb, inodeBlock + blocksInUseBlocks + inodesInUseBlocks,
inodePosition);
+ }
+
+ @Override
+ public void free() {
+ int inodeBlock = inode/inodesPerBlock;
+ int inodePosition = INODE_BYTES*(inode%inodesPerBlock);
+ BlockInfo bi = getInodeSubBlock(inodeBlock, inodePosition);
+ ByteBuffer ib = bi.buf;
+ int indirectIndexBlock = ib.getInt(ib.position() +
BYTES_PER_BLOCK_ADDRESS*DIRECT_POINTERS);
+ int doublyIndirectIndexBlock = ib.getInt(ib.position() +
BYTES_PER_BLOCK_ADDRESS*(DIRECT_POINTERS+1));
+ boolean freedAll = freeBlock(ib, DIRECT_POINTERS, true);
+ if (LogManager.isMessageToBeRecorded(LogConstants.CTX_DQP, MessageLevel.TRACE)) {
+ LogManager.logTrace(LogConstants.CTX_DQP, "freeing inode", inode,
"for", gid, oid); //$NON-NLS-1$ //$NON-NLS-2$
+ }
+ inodesInuse.set(inode, false);
+ if (!freedAll || indirectIndexBlock == EMPTY_ADDRESS) {
+ return;
+ }
+ freedAll = freeIndirectBlock(indirectIndexBlock);
+ if (!freedAll || doublyIndirectIndexBlock == EMPTY_ADDRESS) {
+ return;
+ }
+ BlockInfo bb = getPhysicalBlock(doublyIndirectIndexBlock, false, false);
+ freeBlock(bb.buf, ADDRESSES_PER_BLOCK, false);
+ freeDataBlock(doublyIndirectIndexBlock);
+ }
+
+ private boolean freeIndirectBlock(int indirectIndexBlock) {
+ BlockInfo bb = getPhysicalBlock(indirectIndexBlock, false, false);
+ bb.buf.position(0);
+ boolean freedAll = freeBlock(bb.buf, ADDRESSES_PER_BLOCK, true);
+ freeDataBlock(indirectIndexBlock);
+ return freedAll;
+ }
+
+ private boolean freeBlock(ByteBuffer ib, int numPointers, boolean primary) {
+ for (int i = 0; i < numPointers; i++) {
+ int dataBlock = ib.getInt();
+ if (dataBlock == EMPTY_ADDRESS) {
+ return false;
}
- ByteBuffer bb = ByteBuffer.wrap(buf);
- bb.position(18);
- short blocks = bb.getShort();
- if (!fragmented) {
- for (int i = block + 1; i < blocks; i++) {
- inuse.set(i, false);
- }
+ if (primary) {
+ freeDataBlock(dataBlock);
} else {
- for (short i = 0; i < blocks; i++) {
- int toFree = getNextBlock(bb);
- inuse.set(toFree, false);
- }
+ freeIndirectBlock(dataBlock);
}
- } catch (IOException e) {
- throw new TeiidRuntimeException(e, "Could not read intial block to process
freeing " + oid); //$NON-NLS-1$
}
+ return true;
}
- @SuppressWarnings("unchecked")
- void add(final CacheEntry entry, final Serializer s) throws TeiidComponentException {
- lock.writeLock().lock();
+ @Override
+ public BlockInfo allocateBlock(int blockNum) {
+ int dataBlock = getOrUpdateDataBlockIndex(blockNum, EMPTY_ADDRESS, Mode.ALLOCATE);
+ BlockInfo bb = getPhysicalBlock(dataBlock, false, true);
+ putBlockId(blockNum, bb.buf);
+ bb.buf.position(0);
+ bb.buf.limit(BLOCK_DATA_BYTES);
+ return bb;
+ }
+
+ private void putBlockId(int blockNum, ByteBuffer bb) {
+ bb.position(BLOCK_DATA_BYTES);
+ bb.putLong(gid);
+ bb.putLong(oid);
+ bb.put((byte)(blockNum >> 16));
+ bb.putShort((short)blockNum);
+ }
+
+ }
+
+ private StorageManager storageManager;
+ private long maxBufferSpace = FileStorageManager.DEFAULT_MAX_BUFFERSPACE;
+ private int inodes;
+ private int blocks;
+ private int inodesPerBlock;
+ private int inodesInUseBlocks;
+ private int blocksInUseBlocks;
+ private FileStore store;
+ private FileStore systemStore;
+ private BlockBitSetTree blocksInuse;
+ private BlockBitSetTree inodesInuse;
+
+ //root directory
+ ConcurrentHashMap<Long, BlockClosedLongIntHashTable> physicalMapping = new
ConcurrentHashMap<Long, BlockClosedLongIntHashTable>();
+
+ //block caching
+ int blockCacheSize = 128; //2 MB
+ ReentrantLock blockLock = new ReentrantLock();
+ LinkedHashMap<Integer, BlockInfo> blockCache = new LinkedHashMap<Integer,
BlockInfo>() {
+ private static final long serialVersionUID = -4240291744435552008L;
+ BlockInfo eldestEntry = null;
+
+ protected boolean removeEldestEntry(Map.Entry<Integer,BlockInfo> eldest) {
+ if (size() > blockCacheSize) {
+ BlockInfo bi = eldest.getValue();
+ if (bi.dirty) {
+ eldestEntry = bi;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ public BlockInfo put(Integer key, BlockInfo value) {
+ blockLock.lock();
+ value.dirty = true;
try {
- List<TryFreeParameter> toFree = Collections.emptyList();
- synchronized (freed) {
- if (!freed.isEmpty()) {
- toFree = new ArrayList<TryFreeParameter>(freed);
- freed.clear();
- }
+ return super.put(key, value);
+ } finally {
+ BlockInfo toUpdate = eldestEntry;
+ eldestEntry = null;
+ blockLock.unlock();
+ if (toUpdate != null) {
+ updatePhysicalBlockDirect(toUpdate);
}
- for (TryFreeParameter param : toFree) {
- free(param.gid, param.oid, param.block);
- }
- ExtensibleBufferedOutputStream fsos = new ExtensibleBufferedOutputStream(new
byte[IO_BUFFER_SIZE]) {
-
- byte[] firstBytes = null;
- List<Integer> blocks = new ArrayList<Integer>(8);
- boolean isFragmented;
- int lastCount;
- int start;
-
- @Override
- protected void flushDirect() throws IOException {
- lastCount = count;
- if (firstBytes == null) {
- start = nextBlock(-1);
- firstBytes = buf;
- //need a new buffer only if there could be bytes remaining
- if (count == buf.length) {
- buf = new byte[IO_BUFFER_SIZE];
- }
- } else {
- int last = -1;
- if (!blocks.isEmpty()) {
- last = blocks.get(blocks.size()-1);
- } else {
- last = start;
- }
- int next = nextBlock(last);
- blocks.add(next);
- if (next != last + 1) {
- isFragmented = true;
- }
- if (blocks.size() > MAX_BLOCKS) {
- //TODO handle this case
- throw new TeiidRuntimeException("Exceeded max persistent object size" +
entry.getId()); //$NON-NLS-1$
- }
- store.write(next << 14, buf, 0, count);
- }
- }
-
- @Override
- public void close() throws IOException {
- super.close();
- ByteBuffer bb = ByteBuffer.wrap(firstBytes);
- bb.putLong(s.getId());
- bb.putLong(entry.getId());
- bb.putShort((short)(lastCount - (blocks.isEmpty()?DATA_START:0)));
- bb.putShort((short)blocks.size());
- for (Integer i : blocks) {
- bb.put((byte)(i.intValue()>>16));
- bb.putShort((short)i.intValue());
- }
- store.write(start << 14, firstBytes, 0, firstBytes.length);
- if (LogManager.isMessageToBeRecorded(LogConstants.CTX_BUFFER_MGR,
MessageLevel.TRACE)) {
- LogManager.logTrace(LogConstants.CTX_BUFFER_MGR, id, entry.getId(), "batch
written starting at:", start); //$NON-NLS-1$
- }
- Map<Long, Integer> map = physicalMapping.get(s.getId());
- if (map == null) {
- return;
- }
- map.put(entry.getId(), start);
- inuse.set(start, true);
- for (Integer i : blocks) {
- inuse.set(i, true);
- }
- if (isFragmented) {
- fragmentedFlags.set(start, true);
- }
- }
-
- };
- fsos.count = DATA_START;
- ObjectOutputStream oos = new ObjectOutputStream(fsos);
- oos.writeInt(entry.getSizeEstimate());
- s.serialize(entry.getObject(), oos);
- oos.close();
- } catch (IOException e) {
- throw new TeiidComponentException(e);
- } finally {
- lock.writeLock().unlock();
}
-
- if (entry.getId().longValue() == 451) {
- get(entry.getId(), s);
- }
}
- int nextBlock(int fromIndex) {
- int next = inuse.nextClearBit(fromIndex + 1);
- if (next == -1) {
- throw new TeiidRuntimeException("no freespace available on segment" + id);
//$NON-NLS-1$
+ public BlockInfo get(Object key) {
+ blockLock.lock();
+ try {
+ return super.get(key);
+ } finally {
+ blockLock.unlock();
}
- return next;
}
- @SuppressWarnings("unchecked")
- CacheEntry get(Long oid, Serializer serializer) throws TeiidComponentException {
- lock.readLock().lock();
+ public BlockInfo remove(Object key) {
+ blockLock.lock();
try {
- Map<Long, Integer> map = physicalMapping.get(serializer.getId());
- if (map == null) {
- return null;
+ return super.remove(key);
+ } finally {
+ blockLock.unlock();
+ }
+ }
+ };
+
+ BlockInfo getPhysicalBlock(int block, boolean system, boolean allocate) {
+ if (block < 0) {
+ throw new AssertionError("invalid block address " + block); //$NON-NLS-1$
+ }
+ try {
+ int key = block;
+ if (system) {
+ key |= SYSTEM_MASK;
+ }
+ BlockInfo result = blockCache.get(key);
+ assert result == null || !allocate;
+ if (result == null) {
+ ByteBuffer bb = null;
+ if (system) {
+ bb = systemStore.getBuffer(block<<LOG_BLOCK_SIZE, BLOCK_SIZE, allocate);
+ } else {
+ bb = store.getBuffer(block<<LOG_BLOCK_SIZE, BLOCK_SIZE, allocate);
}
- final Integer startBlock = map.get(oid);
- if (startBlock == null) {
- return null;
+ result = new BlockInfo(system, bb, block, -1);
+ blockLock.lock();
+ try {
+ BlockInfo existing = blockCache.get(key);
+ if (existing != null) {
+ return existing;
+ }
+ blockCache.put(key, result);
+ } finally {
+ blockLock.unlock();
}
- ObjectInputStream ois = new ObjectInputStream(new InputStream() {
-
- int count;
- int pos;
- byte[] buf = new byte[IO_BUFFER_SIZE];
- ByteBuffer firstBytes;
- int currentBlock;
- short tailBytes;
- short totalBlocks;
- short blockNum;
-
- @Override
- public int read() throws IOException {
- if (pos == count) {
- if (firstBytes == null) {
- store.readFully(startBlock << 14, buf, 0, buf.length);
- firstBytes = ByteBuffer.wrap(buf);
- firstBytes.position(16);
- tailBytes = firstBytes.getShort();
- totalBlocks = firstBytes.getShort();
- count = buf.length;
- pos = DATA_START;
- } else {
- buf = new byte[IO_BUFFER_SIZE];
- //TODO: defrag on read
- if (count == buf.length) {
- currentBlock = getNextBlock(firstBytes);
- blockNum++;
- pos = 0;
- }
- int length = blockNum == totalBlocks?tailBytes:buf.length;
- store.readFully(currentBlock << 14, buf, 0, length);
- count = length;
- }
- }
- return buf[pos++] & 0xff;
+ return result;
+ }
+ return result;
+ } catch (IOException e) {
+ throw new TeiidRuntimeException(e);
+ }
+ }
+
+ void updatePhysicalBlock(BlockInfo bi) {
+ int key = bi.physicalAddress;
+ if (bi.system) {
+ key |= SYSTEM_MASK;
+ }
+ if (bi.inodeOffset >= 0) {
+ blockLock.lock();
+ try {
+ BlockInfo actual = blockCache.get(key);
+ if (actual == null) {
+ //we're not in the cache, so just update storage
+ updatePhysicalBlockDirect(bi);
+ } else {
+ //TODO: check to see we're sharing the same buffer
+ for (int i = 0; i < INODE_BYTES; i++) {
+ actual.buf.put(bi.inodeOffset + i, bi.buf.get(i));
}
- });
- CacheEntry ce = new CacheEntry(oid);
- ce.setSizeEstimate(ois.readInt());
- ce.setObject(serializer.deserialize(ois));
- ce.setPersistent(true);
- return ce;
- } catch(IOException e) {
- throw new TeiidComponentException(e,
QueryPlugin.Util.getString("FileStoreageManager.error_reading", id));
//$NON-NLS-1$
- } catch (ClassNotFoundException e) {
- throw new TeiidComponentException(e,
QueryPlugin.Util.getString("FileStoreageManager.error_reading", id));
//$NON-NLS-1$
- } finally {
- lock.readLock().unlock();
- }
+ }
+ } finally {
+ blockLock.unlock();
+ }
+ return;
}
+ blockCache.put(key, bi);
}
+
+ private void updatePhysicalBlockDirect(BlockInfo bi) {
+ try {
+ bi.buf.rewind();
+ if (!bi.system) {
+ store.updateFromBuffer(bi.buf, bi.physicalAddress<<LOG_BLOCK_SIZE);
+ } else {
+ systemStore.updateFromBuffer(bi.buf, bi.physicalAddress<<LOG_BLOCK_SIZE);
+ }
+ } catch (IOException e) {
+ throw new TeiidRuntimeException(e);
+ }
+ }
- static int getNextBlock(ByteBuffer bb) {
- int block = (bb.get() & 0xff)<< 16;
- block += (bb.getShort() & 0xffff);
- return block;
+ InodeBlockManager getBlockManager(long gid, long oid, int inode) {
+ return new InodeBlockManager(gid, oid, inode);
}
-
- private StorageManager storageManager;
- private Segment[] segments;
- private ConcurrentHashMap<Long, Map<Long, Integer>> physicalMapping = new
ConcurrentHashMap<Long, Map<Long,Integer>>();
- private int segmentCount = 32;
-
+
+ @SuppressWarnings("unchecked")
@Override
- public void add(CacheEntry entry, Serializer<?> s) {
- Segment seg = getSegment(entry.getId());
+ public void add(CacheEntry entry, Serializer s) {
+ boolean success = false;
+ InodeBlockManager blockManager = null;
try {
- seg.add(entry, s);
+ BlockClosedLongIntHashTable map = physicalMapping.get(s.getId());
+ if (map == null) {
+ return;
+ }
+ blockManager = getBlockManager(s.getId(), entry.getId(), EMPTY_ADDRESS);
+ blockManager.trackLast = true;
+ ExtensibleBufferedOutputStream fsos = new BlockOutputStream(blockManager);
+ ObjectOutputStream oos = new ObjectOutputStream(fsos);
+ oos.writeInt(entry.getSizeEstimate());
+ s.serialize(entry.getObject(), oos);
+ oos.close();
+ map.put(entry.getId(), blockManager.getInode());
+ success = true;
} catch (Throwable e) {
LogManager.logError(LogConstants.CTX_BUFFER_MGR, e, "Error persisting batch,
attempts to read batch "+ entry.getId() +" later will result in an
exception"); //$NON-NLS-1$ //$NON-NLS-2$
+ } finally {
+ if (!success && blockManager != null) {
+ blockManager.free();
+ }
}
}
-
+
@Override
- public CacheEntry get(Long id, Serializer<?> serializer) throws
TeiidComponentException {
- Segment seg = getSegment(id);
- return seg.get(id, serializer);
+ public CacheEntry get(Long oid, Serializer<?> serializer) throws
TeiidComponentException {
+ try {
+ BlockClosedLongIntHashTable map = physicalMapping.get(serializer.getId());
+ if (map == null) {
+ return null;
+ }
+ final int inode = map.get(oid);
+ if (inode == EMPTY_ADDRESS) {
+ return null;
+ }
+ final BlockManager manager = getBlockManager(serializer.getId(), oid, inode);
+ ObjectInputStream ois = new ObjectInputStream(new InputStream() {
+
+ int blockIndex;
+ BlockInfo buf;
+
+ @Override
+ public int read() throws IOException {
+ ensureBytes();
+ return buf.buf.get() & 0xff;
+ }
+
+ private void ensureBytes() {
+ if (buf == null || buf.buf.remaining() == 0) {
+ if (buf != null) {
+ buf = null;
+ }
+ buf = manager.getBlock(blockIndex++);
+ }
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len)
+ throws IOException {
+ ensureBytes();
+ len = Math.min(len, buf.buf.remaining());
+ buf.buf.get(b, off, len);
+ return len;
+ }
+ });
+ CacheEntry ce = new CacheEntry(oid);
+ ce.setSizeEstimate(ois.readInt());
+ ce.setObject(serializer.deserialize(ois));
+ ce.setPersistent(true);
+ return ce;
+ } catch(IOException e) {
+ throw new TeiidComponentException(e,
QueryPlugin.Util.getString("FileStoreageManager.error_reading", oid));
//$NON-NLS-1$
+ } catch (ClassNotFoundException e) {
+ throw new TeiidComponentException(e,
QueryPlugin.Util.getString("FileStoreageManager.error_reading", oid));
//$NON-NLS-1$
+ }
}
- private Segment getSegment(Long id) {
- return segments[id.hashCode() & (segments.length - 1)];
- }
-
@Override
public FileStore createFileStore(String name) {
return storageManager.createFileStore(name);
@@ -350,62 +653,76 @@
@Override
public void initialize() throws TeiidComponentException {
storageManager.initialize();
- int numSegments = 31 - Integer.numberOfLeadingZeros(this.segmentCount);
- segments = new Segment[1 << numSegments];
- for (int i = 0; i < segments.length; i++) {
- segments[i] = new Segment(i);
- segments[i].store = storageManager.createFileStore("segment_" + i);
//$NON-NLS-1$
- }
+ int logSpace = Math.min(45, 63 - Long.numberOfLeadingZeros(this.maxBufferSpace));
+
+ blocks = 1 << Math.min(Math.max(12, logSpace -LOG_BLOCK_SIZE +1), ADDRESS_BITS);
//blocks per segment
+
+ inodes = blocks>>1;
+ inodesPerBlock = BLOCK_SIZE/INODE_BYTES;
+
+ inodesInUseBlocks = computeInuseBlocks(inodes);
+ blocksInUseBlocks = computeInuseBlocks(blocks);
+
+ inodesInuse = new BlockBitSetTree(inodes - 1, new BitSetBlockManager(0));
+ blocksInuse = new BlockBitSetTree(blocks - 1, new
BitSetBlockManager(inodesInUseBlocks));
+
+ store = storageManager.createFileStore("data_store"); //$NON-NLS-1$
+ systemStore = storageManager.createFileStore("system_store"); //$NON-NLS-1$
}
+
+ static int computeInuseBlocks(int number) {
+ int blockCount = (number>>LOG_BLOCK_SIZE) + ((number&BLOCK_MASK)>0?1:0);
+ return (blockCount>>3) + ((blockCount&7)>0?1:0);
+ }
@Override
public void addToCacheGroup(Long gid, Long oid) {
- Map<Long, Integer> map = physicalMapping.get(gid);
+ BlockClosedLongIntHashTable map = physicalMapping.get(gid);
if (map == null) {
return;
}
- map.put(oid, null);
+ map.put(oid, EMPTY_ADDRESS);
}
@Override
public void createCacheGroup(Long gid) {
- physicalMapping.put(gid, Collections.synchronizedMap(new HashMap<Long,
Integer>()));
+ BlockClosedLongIntHashTable map = new BlockClosedLongIntHashTable(getBlockManager(gid,
-1, EMPTY_ADDRESS));
+ physicalMapping.put(gid, map);
}
@Override
public void remove(Long gid, Long id) {
- Map<Long, Integer> map = physicalMapping.get(gid);
+ BlockClosedLongIntHashTable map = physicalMapping.get(gid);
if (map == null) {
return;
}
- Integer block = map.remove(id);
- if (block != null) {
- Segment s = getSegment(id);
- s.tryFree(gid, id, block);
- }
+ int inode = map.remove(id);
+ free(gid, id, inode);
}
-
+
@Override
public Collection<Long> removeCacheGroup(Long gid) {
- Map<Long, Integer> values = physicalMapping.remove(gid);
- if (values == null) {
+ BlockClosedLongIntHashTable map = physicalMapping.remove(gid);
+ if (map == null) {
return Collections.emptySet();
}
- synchronized (values) {
- for (Map.Entry<Long, Integer> entry : values.entrySet()) {
- if (entry.getValue() != null) {
- Segment s = getSegment(entry.getKey());
- s.tryFree(gid, entry.getKey(), entry.getValue());
- }
+ Map<Long, Integer> values = map.remove();
+ for (Map.Entry<Long, Integer> entry : values.entrySet()) {
+ if (entry.getValue() != null) {
+ free(gid, entry.getKey(), entry.getValue());
}
- return new HashSet<Long>(values.keySet());
}
+ return values.keySet();
}
- public void setSegmentCount(int segmentCount) {
- this.segmentCount = segmentCount;
+ void free(Long gid, Long oid, int inode) {
+ if (inode == EMPTY_ADDRESS) {
+ return;
+ }
+ BlockManager bm = getBlockManager(gid, oid, inode);
+ bm.free();
}
-
+
public void setStorageManager(StorageManager storageManager) {
this.storageManager = storageManager;
}
@@ -414,4 +731,16 @@
return storageManager;
}
+ public void setMaxBufferSpace(long maxBufferSpace) {
+ this.maxBufferSpace = maxBufferSpace;
+ }
+
+ public int getInodesInUse() {
+ return this.inodesInuse.getBitsSet();
+ }
+
+ public int getDataBlocksInUse() {
+ return this.blocksInuse.getBitsSet();
+ }
+
}
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java 2011-09-23
04:11:11 UTC (rev 3503)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/MemoryStorageManager.java 2011-09-28
03:15:08 UTC (rev 3504)
@@ -22,6 +22,7 @@
package org.teiid.common.buffer.impl;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
@@ -86,6 +87,20 @@
public synchronized long getLength() {
return buffer.limit();
}
+
+ @Override
+ public synchronized ByteBuffer getBuffer(long start, int length, boolean allocate) {
+ int position = (int)start;
+ buffer.limit(position + length);
+ buffer.position(position);
+ return buffer.slice();
+ }
+
+ @Override
+ public void updateFromBuffer(ByteBuffer bb, long start)
+ throws IOException {
+ //do nothing we are sharing the bytes
+ }
}
@@ -134,7 +149,7 @@
@Override
public CacheEntry get(Long id, Serializer<?> serializer)
throws TeiidComponentException {
- Map<Long, CacheEntry> group = groups.get(id);
+ Map<Long, CacheEntry> group = groups.get(serializer.getId());
if (group != null) {
return group.get(id);
}
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java
===================================================================
---
trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java 2011-09-23
04:11:11 UTC (rev 3503)
+++
trunk/engine/src/main/java/org/teiid/common/buffer/impl/SplittableStorageManager.java 2011-09-28
03:15:08 UTC (rev 3504)
@@ -23,6 +23,7 @@
package org.teiid.common.buffer.impl;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -30,6 +31,13 @@
import org.teiid.common.buffer.StorageManager;
import org.teiid.core.TeiidComponentException;
+/**
+ * A storage manager that combines smaller files into a larger
+ * logical file.
+ *
+ * The buffer methods assume that buffers cannot go beyond single
+ * file boundaries.
+ */
public class SplittableStorageManager implements StorageManager {
public static final long DEFAULT_MAX_FILESIZE = 2 * 1024l;
@@ -82,7 +90,7 @@
ensureLength(fileOffset + length);
store = storageFiles.get((int)(fileOffset/maxFileSize));
}
- long fileBegin = (int)(fileOffset%maxFileSize);
+ long fileBegin = fileOffset%maxFileSize;
length = Math.min(length, (int)Math.min(Integer.MAX_VALUE, maxFileSize -
fileBegin));
store.write(fileBegin, b, offSet, length);
return length;
@@ -109,6 +117,28 @@
}
len = length;
}
+
+ @Override
+ public ByteBuffer getBuffer(long start, int length, boolean allocate) throws
IOException {
+ FileStore store = null;
+ synchronized (this) {
+ ensureLength(start + length);
+ store = storageFiles.get((int)(start/maxFileSize));
+ }
+ long fileBegin = start%maxFileSize;
+ return store.getBuffer(fileBegin, length, allocate);
+ }
+
+ @Override
+ public void updateFromBuffer(ByteBuffer bb, long start)
+ throws IOException {
+ FileStore store = null;
+ synchronized (this) {
+ store = storageFiles.get((int)(start/maxFileSize));
+ }
+ long fileBegin = start%maxFileSize;
+ store.updateFromBuffer(bb, fileBegin);
+ }
@Override
public synchronized void setLength(long length) throws IOException {
Modified: trunk/engine/src/test/java/org/teiid/common/buffer/BufferManagerFactory.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/common/buffer/BufferManagerFactory.java 2011-09-23
04:11:11 UTC (rev 3503)
+++
trunk/engine/src/test/java/org/teiid/common/buffer/BufferManagerFactory.java 2011-09-28
03:15:08 UTC (rev 3504)
@@ -86,7 +86,7 @@
SplittableStorageManager ssm = new SplittableStorageManager(storageManager);
ssm.setMaxFileSizeDirect(MemoryStorageManager.MAX_FILE_SIZE);
FileStoreCache fsc = new FileStoreCache();
- fsc.setSegmentCount(1);
+ fsc.setMaxBufferSpace(Runtime.getRuntime().maxMemory()/4);
fsc.setStorageManager(ssm);
fsc.initialize();
bufferManager.setCache(fsc);
Deleted: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBitSetTree.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBitSetTree.java 2011-09-23
04:11:11 UTC (rev 3503)
+++ trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBitSetTree.java 2011-09-28
03:15:08 UTC (rev 3504)
@@ -1,60 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
- package org.teiid.common.buffer.impl;
-
-import static org.junit.Assert.*;
-
-import java.util.BitSet;
-import java.util.Random;
-
-import org.junit.Test;
-
-public class TestBitSetTree {
-
- @Test public void testBitsSet() {
- BitSetTree bst = new BitSetTree();
- bst.set(1, true);
- bst.set(100, true);
- bst.set(10000, true);
- bst.set(1000000, true);
- assertEquals(4, bst.getBitsSet());
- }
-
- @Test public void testNextClearSet() {
- BitSetTree bst = new BitSetTree();
- BitSet bst1 = new BitSet();
- Random r = new Random(1);
- for (int i = 0; i < 1000; i++) {
- int rand = r.nextInt() & BitSetTree.MAX_INDEX;
- bst.set(rand, true);
- bst1.set(rand, true);
- }
-
- for (int i = 0; i < 10000; i++) {
- int rand = r.nextInt() & BitSetTree.MAX_INDEX;
- assertEquals(bst1.nextClearBit(rand), bst.nextClearBit(rand));
- assertEquals(bst1.nextSetBit(rand), bst.nextSetBit(rand));
- }
- }
-
-}
Copied: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockBitSetTree.java
(from rev 3502,
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBitSetTree.java)
===================================================================
--- trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockBitSetTree.java
(rev 0)
+++
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockBitSetTree.java 2011-09-28
03:15:08 UTC (rev 3504)
@@ -0,0 +1,66 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+ package org.teiid.common.buffer.impl;
+
+import static org.junit.Assert.*;
+
+import java.util.BitSet;
+import java.util.Random;
+
+import org.junit.Test;
+
+public class TestBlockBitSetTree {
+
+ @Test public void testBitsSet() {
+ BlockBitSetTree bst = new BlockBitSetTree((1 << 20) -1, new
TestBlockClosedLongIntHashTable.DummyBlockManager());
+ bst.set(1, true);
+ bst.set(100, true);
+ bst.set(10000, true);
+ bst.set(1000000, true);
+ assertEquals(4, bst.getBitsSet());
+ bst.set(1, false);
+ assertEquals(3, bst.getBitsSet());
+ assertFalse(bst.get(1));
+ }
+
+ @Test public void testNextClearSet() {
+ BlockBitSetTree bst = new BlockBitSetTree((1 << 20) -1, new
TestBlockClosedLongIntHashTable.DummyBlockManager());
+ BitSet bst1 = new BitSet();
+ Random r = new Random(1);
+ for (int i = 0; i < 1000; i++) {
+ int rand = r.nextInt() & bst.getMaxIndex();
+ bst.set(rand, true);
+ bst1.set(rand, true);
+ assertTrue(bst.get(rand));
+ assertEquals(bst1.nextSetBit(rand), bst.nextSetBit(rand));
+ assertEquals(String.valueOf(i), bst1.nextSetBit(rand), bst.nextSetBit(rand));
+ }
+
+ for (int i = 0; i < 10000; i++) {
+ int rand = r.nextInt() & bst.getMaxIndex();
+ assertEquals(bst1.nextClearBit(rand), bst.nextClearBit(rand));
+ assertEquals(bst1.nextSetBit(rand), bst.nextSetBit(rand));
+ }
+ }
+
+}
Property changes on:
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockBitSetTree.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Added:
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockClosedLongIntHashTable.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockClosedLongIntHashTable.java
(rev 0)
+++
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockClosedLongIntHashTable.java 2011-09-28
03:15:08 UTC (rev 3504)
@@ -0,0 +1,92 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * See the COPYRIGHT.txt file distributed with this work for information
+ * regarding copyright ownership. Some portions may be licensed
+ * to Red Hat, Inc. under one or more contributor license agreements.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+ * 02110-1301 USA.
+ */
+
+package org.teiid.common.buffer.impl;
+
+import static org.junit.Assert.*;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+
+import org.junit.Test;
+import org.teiid.common.buffer.impl.FileStoreCache.BlockInfo;
+
+public class TestBlockClosedLongIntHashTable {
+
+ public static final class DummyBlockManager implements BlockManager {
+ List<BlockInfo> blocks = new ArrayList<BlockInfo>();
+
+ @Override
+ public int getInode() {
+ return 0;
+ }
+
+ @Override
+ public void updateBlock(BlockInfo block) {
+
+ }
+
+ @Override
+ public void free() {
+ blocks.clear();
+ }
+
+ @Override
+ public BlockInfo getBlock(int index) {
+ BlockInfo block = blocks.get(index);
+ block.buf.rewind();
+ return block;
+ }
+
+ @Override
+ public void freeBlock(int index) {
+ blocks.remove(index);
+ }
+
+ @Override
+ public BlockInfo allocateBlock(int index) {
+ assertEquals(index, blocks.size());
+ ByteBuffer result = ByteBuffer.wrap(new byte[FileStoreCache.BLOCK_SIZE]);
+ blocks.add(new BlockInfo(false, result, index, -1));
+ return blocks.get(blocks.size() - 1);
+ }
+ }
+
+ @Test public void testAgainstHashMap() {
+ BlockClosedLongIntHashTable table = new BlockClosedLongIntHashTable(new
DummyBlockManager());
+ HashMap<Long, Integer> table1 = new HashMap<Long, Integer>(16);
+ for (long i = 1; i < 200000; i++) {
+ table.put(i, (int)i);
+ table1.put(i, (int)i);
+ }
+ Random r = new Random(0);
+ for (int i = 1; i < 2000000; i++) {
+ long toRemove = r.nextInt(i);
+ boolean removed = table.remove(toRemove) != BlockClosedLongIntHashTable.EMPTY;
+ assertEquals(table1.remove(toRemove) != null, removed);
+ }
+ }
+
+}
Property changes on:
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestBlockClosedLongIntHashTable.java
___________________________________________________________________
Added: svn:mime-type
+ text/plain
Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStoreCache.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStoreCache.java 2011-09-23
04:11:11 UTC (rev 3503)
+++
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestFileStoreCache.java 2011-09-28
03:15:08 UTC (rev 3504)
@@ -27,51 +27,93 @@
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
-import java.util.Arrays;
-import java.util.List;
import org.junit.Test;
import org.teiid.common.buffer.CacheEntry;
import org.teiid.common.buffer.Serializer;
+import org.teiid.core.util.UnitTestUtil;
public class TestFileStoreCache {
+ private final class SimpleSerializer implements Serializer<Integer> {
+ @Override
+ public Integer deserialize(ObjectInputStream ois)
+ throws IOException, ClassNotFoundException {
+ Integer result = ois.readInt();
+ for (int i = 0; i < result; i++) {
+ assertEquals(i, ois.readInt());
+ }
+ return result;
+ }
+
+ @Override
+ public Long getId() {
+ return 1l;
+ }
+
+ @Override
+ public void serialize(Integer obj, ObjectOutputStream oos)
+ throws IOException {
+ oos.writeInt(obj);
+ for (int i = 0; i < obj; i++) {
+ oos.writeInt(i);
+ }
+ }
+
+ @Override
+ public boolean useSoftCache() {
+ return false;
+ }
+ }
+
@Test public void testAddGetMultiBlock() throws Exception {
FileStoreCache fsc = new FileStoreCache();
- fsc.setStorageManager(new MemoryStorageManager());
+ fsc.setMaxBufferSpace(1 << 28);
+ SplittableStorageManager ssm = new SplittableStorageManager(new
MemoryStorageManager());
+ ssm.setMaxFileSizeDirect(MemoryStorageManager.MAX_FILE_SIZE);
+ fsc.setStorageManager(ssm);
fsc.initialize();
+ UnitTestUtil.enableTraceLogging("org.teiid"); //$NON-NLS-1$
+
CacheEntry ce = new CacheEntry(2l);
- Serializer<Object> s = new Serializer<Object>() {
- @Override
- public Object deserialize(ObjectInputStream ois)
- throws IOException, ClassNotFoundException {
- return ois.readObject();
- }
-
- @Override
- public Long getId() {
- return 1l;
- }
-
- @Override
- public void serialize(Object obj, ObjectOutputStream oos)
- throws IOException {
- oos.writeObject(obj);
- }
-
- @Override
- public boolean useSoftCache() {
- return false;
- }
- };
+ Serializer<Integer> s = new SimpleSerializer();
fsc.createCacheGroup(s.getId());
- List<Object> cacheObject = Arrays.asList(new Object[10000]);
+ Integer cacheObject = Integer.valueOf(2);
ce.setObject(cacheObject);
fsc.add(ce, s);
ce = fsc.get(2l, s);
assertEquals(cacheObject, ce.getObject());
+
+ //test something that exceeds the direct inode data blocks
+ ce = new CacheEntry(3l);
+ cacheObject = Integer.valueOf(80000);
+ ce.setObject(cacheObject);
+ fsc.add(ce, s);
+
+ ce = fsc.get(3l, s);
+ assertEquals(cacheObject, ce.getObject());
+
+ fsc.removeCacheGroup(1l);
+
+ assertEquals(0, fsc.getDataBlocksInUse());
+ assertEquals(0, fsc.getInodesInUse());
+
+ //test something that exceeds the indirect inode data blocks
+ ce = new CacheEntry(3l);
+ fsc.createCacheGroup(s.getId());
+ cacheObject = Integer.valueOf(5000000);
+ ce.setObject(cacheObject);
+ fsc.add(ce, s);
+
+ ce = fsc.get(3l, s);
+ assertEquals(cacheObject, ce.getObject());
+
+ fsc.removeCacheGroup(1l);
+
+ assertEquals(0, fsc.getDataBlocksInUse());
+ assertEquals(0, fsc.getInodesInUse());
}
}
Modified: trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSizeUtility.java
===================================================================
---
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSizeUtility.java 2011-09-23
04:11:11 UTC (rev 3503)
+++
trunk/engine/src/test/java/org/teiid/common/buffer/impl/TestSizeUtility.java 2011-09-28
03:15:08 UTC (rev 3504)
@@ -98,18 +98,6 @@
helpTestGetSize("abcdefghij", 64); //$NON-NLS-1$
}
- public void XtestGetSizeLongString() {
- // There is no clear way of figuring out the actual size of a string that is
created
- // from a StringBuffer because the buffer can sometimes be twice as big as the
actual length of the string
- // Since the data comin from the connector is not created this way, this test is
an inaccurate setup
- int size = 10000;
- StringBuffer str = new StringBuffer();
- for(int i=0; i<size; i++) {
- str.append("a"); //$NON-NLS-1$
- }
- helpTestGetSize(str.toString(), size+3);
- }
-
@Test public void testGetSizeRow1() {
List<Object> row = new ArrayList<Object>(1);
row.add(new Integer(0));
Modified: trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java
===================================================================
--- trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java 2011-09-23
04:11:11 UTC (rev 3503)
+++ trunk/runtime/src/main/java/org/teiid/services/BufferServiceImpl.java 2011-09-28
03:15:08 UTC (rev 3504)
@@ -66,7 +66,7 @@
private long maxFileSize = SplittableStorageManager.DEFAULT_MAX_FILESIZE; // 2GB
private int maxProcessingKb = BufferManager.DEFAULT_MAX_PROCESSING_KB;
private int maxReserveKb = BufferManager.DEFAULT_RESERVE_BUFFER_KB;
- private long maxBufferSpace = FileStorageManager.DEFAULT_MAX_BUFFERSPACE;
+ private long maxBufferSpace = FileStorageManager.DEFAULT_MAX_BUFFERSPACE>>20;
private boolean inlineLobs = true;
private FileStorageManager fsm;
@@ -108,6 +108,8 @@
ssm.setMaxFileSize(maxFileSize);
FileStoreCache fsc = new FileStoreCache();
fsc.setStorageManager(ssm);
+ fsc.setMaxBufferSpace(maxBufferSpace*MB);
+ fsc.initialize();
this.bufferMgr.setCache(fsc);
} else {
this.bufferMgr.setCache(new MemoryStorageManager());
Modified:
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java
===================================================================
---
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java 2011-09-23
04:11:11 UTC (rev 3503)
+++
trunk/test-integration/common/src/test/java/org/teiid/systemmodel/TestMatViewReplication.java 2011-09-28
03:15:08 UTC (rev 3504)
@@ -30,9 +30,6 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
-import java.util.logging.Handler;
-import java.util.logging.Level;
-import java.util.logging.Logger;
import org.jgroups.JChannelFactory;
import org.junit.BeforeClass;
@@ -58,16 +55,7 @@
@Test public void testReplication() throws Exception {
if (DEBUG) {
- Logger logger = Logger.getLogger("org.teiid");
- logger.setLevel(Level.FINEST);
- for (Handler h : logger.getHandlers()) {
- h.setLevel(Level.FINEST);
- }
- /*org.apache.log4j.Logger l = LogManager.getLogger("org.jgroups");
- l.setLevel(org.apache.log4j.Level.TRACE);
- ConsoleAppender ca = new ConsoleAppender(new PatternLayout());
- ca.setName("x");
- l.addAppender(ca);*/
+ UnitTestUtil.enableTraceLogging("org.teiid");
}
FakeServer server1 = createServer();