Author: shawkins
Date: 2012-02-13 10:18:20 -0500 (Mon, 13 Feb 2012)
New Revision: 3866
Removed:
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectInputStream.java
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectOutputStream.java
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
branches/7.7.x/test-integration/perf/src/test/java/org/teiid/query/eval/TestEnginePerformance.java
Log:
TEIID-1932 adding tests for serialization and removing custom objectoutput
Modified:
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
---
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2012-02-10
22:32:36 UTC (rev 3865)
+++
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java 2012-02-13
15:18:20 UTC (rev 3866)
@@ -26,7 +26,9 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInput;
+import java.io.ObjectInputStream;
import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -591,7 +593,7 @@
hasPermit = true;
blockManager = getBlockManager(s.getId(), entry.getId(), EMPTY_ADDRESS);
BlockOutputStream bos = new BlockOutputStream(blockManager, memoryBlocks);
- ObjectOutput dos = new DataObjectOutputStream(bos);
+ ObjectOutput dos = new ObjectOutputStream(bos);
dos.writeLong(s.getId());
dos.writeLong(entry.getId());
dos.writeInt(entry.getSizeEstimate());
@@ -719,7 +721,7 @@
if (lock != null) {
is = readIntoMemory(info, is, lock, memoryBlocks);
}
- ObjectInput dis = new DataObjectInputStream(is);
+ ObjectInput dis = new ObjectInputStream(is);
dis.readFully(HEADER_SKIP_BUFFER);
int sizeEstimate = dis.readInt();
CacheEntry ce = new CacheEntry(new CacheKey(oid, 1, 1), sizeEstimate,
serializer.deserialize(dis), ref, true);
Deleted:
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectInputStream.java
===================================================================
---
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectInputStream.java 2012-02-10
22:32:36 UTC (rev 3865)
+++
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectInputStream.java 2012-02-13
15:18:20 UTC (rev 3866)
@@ -1,105 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.common.buffer.impl;
-
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInput;
-import java.io.ObjectInputStream;
-import java.io.ObjectStreamClass;
-import java.io.StreamCorruptedException;
-
-public class DataObjectInputStream extends DataInputStream implements ObjectInput {
-
- ObjectInput ois;
- ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
-
- public DataObjectInputStream(InputStream in) {
- super(in);
- }
-
- @Override
- public Object readObject() throws ClassNotFoundException, IOException {
- if (ois == null) {
- ois = new ObjectInputStream(this) {
-
- @Override
- protected void readStreamHeader() throws IOException,
- StreamCorruptedException {
- int version = readByte() & 0xFF;
- if (version != STREAM_VERSION) {
- throw new StreamCorruptedException("Unsupported version: " +
version); //$NON-NLS-1$
- }
- }
-
- @Override
- protected ObjectStreamClass readClassDescriptor()
- throws IOException, ClassNotFoundException {
- int type = read();
- if (type < 0) {
- throw new EOFException();
- }
- switch (type) {
- case DataObjectOutputStream.TYPE_FAT_DESCRIPTOR:
- return super.readClassDescriptor();
- case DataObjectOutputStream.TYPE_THIN_DESCRIPTOR:
- String className = readUTF();
- Class<?> clazz = loadClass(className);
- return ObjectStreamClass.lookup(clazz);
- default:
- className = DataObjectOutputStream.typeMapping.get((byte)type);
- if (className == null) {
- throw new StreamCorruptedException("Unknown class type " + type);
//$NON-NLS-1$
- }
- clazz = loadClass(className);
- return ObjectStreamClass.lookup(clazz);
- }
- }
-
- @Override
- protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException,
ClassNotFoundException {
- String className = desc.getName();
- try {
- return loadClass(className);
- } catch (ClassNotFoundException ex) {
- return super.resolveClass(desc);
- }
- }
-
- protected Class<?> loadClass(String className) throws ClassNotFoundException
{
- Class<?> clazz;
- if (classLoader != null) {
- clazz = classLoader.loadClass(className);
- } else {
- clazz = Class.forName(className);
- }
- return clazz;
- }
- };
- }
- return ois.readObject();
- }
-
-}
Deleted:
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectOutputStream.java
===================================================================
---
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectOutputStream.java 2012-02-10
22:32:36 UTC (rev 3865)
+++
branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectOutputStream.java 2012-02-13
15:18:20 UTC (rev 3866)
@@ -1,99 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership. Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.common.buffer.impl;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutput;
-import java.io.ObjectOutputStream;
-import java.io.ObjectStreamClass;
-import java.io.OutputStream;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Extends the logic of Netty's CompactObjectOutputStream to use byte identifiers
- * for some classes and to write/flush objects directly into the output.
- *
- * We can do this since buffer serialized data is ephemeral and good only for
- * a single process.
- */
-public class DataObjectOutputStream extends DataOutputStream implements ObjectOutput {
-
- private static final int MAX_BYTE_IDS = 254;
- static AtomicInteger counter = new AtomicInteger(2);
- static final ConcurrentHashMap<String, Byte> knownClasses = new
ConcurrentHashMap<String, Byte>();
- static final ConcurrentHashMap<Byte, String> typeMapping = new
ConcurrentHashMap<Byte, String>();
-
- static final int TYPE_FAT_DESCRIPTOR = 0;
- static final int TYPE_THIN_DESCRIPTOR = 1;
-
- ObjectOutputStream oos;
-
- public DataObjectOutputStream(OutputStream out) {
- super(out);
- }
-
- @Override
- public void writeObject(Object obj) throws IOException {
- if (oos == null) {
- oos = new ObjectOutputStream(this) {
- @Override
- protected void writeStreamHeader() throws IOException {
- writeByte(STREAM_VERSION);
- }
-
- @Override
- protected void writeClassDescriptor(ObjectStreamClass desc) throws IOException {
- Class<?> clazz = desc.forClass();
- if (clazz.isPrimitive() || clazz.isArray()) {
- write(TYPE_FAT_DESCRIPTOR);
- super.writeClassDescriptor(desc);
- } else {
- String name = desc.getName();
- Byte b = knownClasses.get(name);
- if (b == null && counter.get() < MAX_BYTE_IDS) {
- synchronized (DataObjectOutputStream.class) {
- b = knownClasses.get(name);
- if (b == null && counter.get() < 254) {
- b = (byte)counter.getAndIncrement();
- knownClasses.put(name, b);
- typeMapping.put(b, name);
- }
- }
- }
- if (b != null) {
- write(b);
- } else {
- write(TYPE_THIN_DESCRIPTOR);
- writeUTF(name);
- }
- }
- }
- };
- }
- oos.writeObject(obj);
- oos.flush();
- }
-
-}
Modified:
branches/7.7.x/test-integration/perf/src/test/java/org/teiid/query/eval/TestEnginePerformance.java
===================================================================
---
branches/7.7.x/test-integration/perf/src/test/java/org/teiid/query/eval/TestEnginePerformance.java 2012-02-10
22:32:36 UTC (rev 3865)
+++
branches/7.7.x/test-integration/perf/src/test/java/org/teiid/query/eval/TestEnginePerformance.java 2012-02-13
15:18:20 UTC (rev 3866)
@@ -25,7 +25,13 @@
import static org.junit.Assert.*;
import static org.teiid.query.processor.TestProcessor.*;
+import java.io.ByteArrayInputStream;
import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.sql.Date;
+import java.sql.Time;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -41,6 +47,7 @@
import org.junit.BeforeClass;
import org.junit.Test;
import org.teiid.api.exception.query.QueryParserException;
+import org.teiid.client.BatchSerializer;
import org.teiid.common.buffer.BlockedException;
import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.TupleBatch;
@@ -51,6 +58,7 @@
import org.teiid.core.TeiidException;
import org.teiid.core.TeiidProcessingException;
import org.teiid.core.types.DataTypeManager;
+import org.teiid.core.util.AccessibleByteArrayOutputStream;
import org.teiid.core.util.UnitTestUtil;
import org.teiid.query.metadata.QueryMetadataInterface;
import org.teiid.query.optimizer.capabilities.CapabilitiesFinder;
@@ -371,7 +379,71 @@
@Test public void runLike_16() throws Exception {
helpTestLike(50000, 16);
}
+
+ @Test public void runBatchSerialization_String() throws Exception {
+ String[] types = new String[] {DataTypeManager.DefaultDataTypes.STRING};
+ int size = 1024;
+
+ final List<List<?>> batch = new ArrayList<List<?>>();
+ for (int i = 0; i < size; i++) {
+ batch.add(Arrays.asList(String.valueOf(i)));
+ }
+ helpTestBatchSerialization(types, batch, 50000, 2);
+ }
+
+ @Test public void runBatchSerialization_Time() throws Exception {
+ final String[] types = new String[] {DataTypeManager.DefaultDataTypes.TIME};
+ int size = 1024;
+
+ final List<List<?>> batch = new ArrayList<List<?>>();
+ for (int i = 0; i < size; i++) {
+ batch.add(Arrays.asList(new Time(i)));
+ }
+ helpTestBatchSerialization(types, batch, 50000, 2);
+ }
+
+ @Test public void runBatchSerialization_Date() throws Exception {
+ final String[] types = new String[] {DataTypeManager.DefaultDataTypes.DATE};
+ int size = 1024;
+
+ final List<List<?>> batch = new ArrayList<List<?>>();
+ for (int i = 0; i < size; i++) {
+ batch.add(Arrays.asList(new Date(i)));
+ }
+ helpTestBatchSerialization(types, batch, 50000, 2);
+ }
+
+ private void helpTestBatchSerialization(final String[] types,
+ final List<List<?>> batch, int iterations, int threadCount)
+ throws InterruptedException, Exception {
+ runTask(iterations, threadCount, new Task() {
+
+ @Override
+ public Void call() throws Exception {
+ writeReadBatch(types, batch);
+ return null;
+ }
+ });
+ }
+ private List<List<Object>> writeReadBatch(String[] types,
List<List<?>> batch)
+ throws IOException, ClassNotFoundException {
+ AccessibleByteArrayOutputStream baos = new AccessibleByteArrayOutputStream(5000);
+ ObjectOutputStream out = new ObjectOutputStream(baos);
+ BatchSerializer.writeBatch(out, types, batch);
+ out.flush();
+
+ byte[] bytes = baos.getBuffer();
+
+ ByteArrayInputStream bytesIn = new ByteArrayInputStream(bytes, 0,
baos.getCount());
+ ObjectInputStream in = new ObjectInputStream(bytesIn);
+ List<List<Object>> newBatch = BatchSerializer.readBatch(in, types);
+ out.close();
+ in.close();
+ assertEquals(batch.size(), newBatch.size());
+ return newBatch;
+ }
+
private void helpTestLike(int iterations, int threads) throws QueryParserException,
InterruptedException, Exception {
final Expression ex =
QueryParser.getQueryParser().parseExpression("'abcdefg' like
'a%g'");