[teiid-commits] teiid SVN: r3866 - in branches/7.7.x: test-integration/perf/src/test/java/org/teiid/query/eval and 1 other directory.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Mon Feb 13 10:18:21 EST 2012


Author: shawkins
Date: 2012-02-13 10:18:20 -0500 (Mon, 13 Feb 2012)
New Revision: 3866

Removed:
   branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectInputStream.java
   branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectOutputStream.java
Modified:
   branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
   branches/7.7.x/test-integration/perf/src/test/java/org/teiid/query/eval/TestEnginePerformance.java
Log:
TEIID-1932 adding tests for serialization and removing custom objectoutput

Modified: branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java	2012-02-10 22:32:36 UTC (rev 3865)
+++ branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/BufferFrontedFileStoreCache.java	2012-02-13 15:18:20 UTC (rev 3866)
@@ -26,7 +26,9 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.ObjectInput;
+import java.io.ObjectInputStream;
 import java.io.ObjectOutput;
+import java.io.ObjectOutputStream;
 import java.lang.ref.WeakReference;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -591,7 +593,7 @@
 			hasPermit = true;
 			blockManager = getBlockManager(s.getId(), entry.getId(), EMPTY_ADDRESS);
 			BlockOutputStream bos = new BlockOutputStream(blockManager, memoryBlocks);
-			ObjectOutput dos = new DataObjectOutputStream(bos);
+			ObjectOutput dos = new ObjectOutputStream(bos);
 			dos.writeLong(s.getId());
 			dos.writeLong(entry.getId());
 			dos.writeInt(entry.getSizeEstimate());
@@ -719,7 +721,7 @@
 			if (lock != null) {
 				is = readIntoMemory(info, is, lock, memoryBlocks);
 			}
-			ObjectInput dis = new DataObjectInputStream(is);
+			ObjectInput dis = new ObjectInputStream(is);
 			dis.readFully(HEADER_SKIP_BUFFER);
 			int sizeEstimate = dis.readInt();
 			CacheEntry ce = new CacheEntry(new CacheKey(oid, 1, 1), sizeEstimate, serializer.deserialize(dis), ref, true);

Deleted: branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectInputStream.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectInputStream.java	2012-02-10 22:32:36 UTC (rev 3865)
+++ branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectInputStream.java	2012-02-13 15:18:20 UTC (rev 3866)
@@ -1,105 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership.  Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- * 
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- * 
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.common.buffer.impl;
-
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInput;
-import java.io.ObjectInputStream;
-import java.io.ObjectStreamClass;
-import java.io.StreamCorruptedException;
-
-public class DataObjectInputStream extends DataInputStream implements ObjectInput {
-	
-	ObjectInput ois;
-	ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
-	
-	public DataObjectInputStream(InputStream in) {
-		super(in);
-	}
-	
-	@Override
-	public Object readObject() throws ClassNotFoundException, IOException {
-		if (ois == null) {
-			ois = new ObjectInputStream(this) {
-				
-				@Override
-				protected void readStreamHeader() throws IOException,
-						StreamCorruptedException {
-					int version = readByte() & 0xFF;
-			        if (version != STREAM_VERSION) {
-			            throw new StreamCorruptedException("Unsupported version: " + version); //$NON-NLS-1$
-			        }
-				}
-				
-			    @Override
-			    protected ObjectStreamClass readClassDescriptor()
-			            throws IOException, ClassNotFoundException {
-			        int type = read();
-			        if (type < 0) {
-			            throw new EOFException();
-			        }
-			        switch (type) {
-			        case DataObjectOutputStream.TYPE_FAT_DESCRIPTOR:
-			            return super.readClassDescriptor();
-			        case DataObjectOutputStream.TYPE_THIN_DESCRIPTOR:
-			            String className = readUTF();
-			            Class<?> clazz = loadClass(className);
-			            return ObjectStreamClass.lookup(clazz);
-			        default:
-			        	className = DataObjectOutputStream.typeMapping.get((byte)type);
-			        	if (className == null) {
-			        		throw new StreamCorruptedException("Unknown class type " + type); //$NON-NLS-1$
-			        	}
-			            clazz = loadClass(className);
-			            return ObjectStreamClass.lookup(clazz);
-			        }
-			    }
-
-			    @Override
-			    protected Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException {
-			        String className = desc.getName();
-			        try {
-			            return loadClass(className);
-			        } catch (ClassNotFoundException ex) {
-			            return super.resolveClass(desc);
-			        }
-			    }
-
-			    protected Class<?> loadClass(String className) throws ClassNotFoundException {
-			        Class<?> clazz;
-			        if (classLoader != null) {
-			            clazz = classLoader.loadClass(className);
-			        } else {
-			            clazz = Class.forName(className);
-			        }
-			        return clazz;
-			    }
-			};
-		}
-		return ois.readObject();
-	}
-
-}

Deleted: branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectOutputStream.java
===================================================================
--- branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectOutputStream.java	2012-02-10 22:32:36 UTC (rev 3865)
+++ branches/7.7.x/engine/src/main/java/org/teiid/common/buffer/impl/DataObjectOutputStream.java	2012-02-13 15:18:20 UTC (rev 3866)
@@ -1,99 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * See the COPYRIGHT.txt file distributed with this work for information
- * regarding copyright ownership.  Some portions may be licensed
- * to Red Hat, Inc. under one or more contributor license agreements.
- * 
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- * 
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
- * 02110-1301 USA.
- */
-
-package org.teiid.common.buffer.impl;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.ObjectOutput;
-import java.io.ObjectOutputStream;
-import java.io.ObjectStreamClass;
-import java.io.OutputStream;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Extends the logic of Netty's CompactObjectOutputStream to use byte identifiers
- * for some classes and to write/flush objects directly into the output.
- * 
- * We can do this since buffer serialized data is ephemeral and good only for
- * a single process.
- */
-public class DataObjectOutputStream extends DataOutputStream implements ObjectOutput {
-	
-	private static final int MAX_BYTE_IDS = 254;
-	static AtomicInteger counter = new AtomicInteger(2);
-	static final ConcurrentHashMap<String, Byte> knownClasses = new ConcurrentHashMap<String, Byte>();
-	static final ConcurrentHashMap<Byte, String> typeMapping = new ConcurrentHashMap<Byte, String>();
-	
-    static final int TYPE_FAT_DESCRIPTOR = 0;
-    static final int TYPE_THIN_DESCRIPTOR = 1;
-    
-	ObjectOutputStream oos;
-	
-	public DataObjectOutputStream(OutputStream out) {
-		super(out);
-	}
-
-	@Override
-	public void writeObject(Object obj) throws IOException {
-		if (oos == null) {
-			oos = new ObjectOutputStream(this) {
-				@Override
-				protected void writeStreamHeader() throws IOException {
-					writeByte(STREAM_VERSION);
-				}
-				
-				@Override
-			    protected void writeClassDescriptor(ObjectStreamClass desc) throws IOException {
-			        Class<?> clazz = desc.forClass();
-			        if (clazz.isPrimitive() || clazz.isArray()) {
-			            write(TYPE_FAT_DESCRIPTOR);
-			            super.writeClassDescriptor(desc);
-			        } else {
-			        	String name = desc.getName();
-			        	Byte b = knownClasses.get(name);
-			        	if (b == null && counter.get() < MAX_BYTE_IDS) {
-		        			synchronized (DataObjectOutputStream.class) {
-								b = knownClasses.get(name);
-								if (b == null && counter.get() < 254) {
-									b = (byte)counter.getAndIncrement();
-									knownClasses.put(name, b);
-									typeMapping.put(b, name);
-								}
-			        		}
-			        	}
-			        	if (b != null) {
-			        		write(b);
-			        	} else {
-				            write(TYPE_THIN_DESCRIPTOR);
-				            writeUTF(name);
-			        	}
-			        }
-			    }
-			};
-		}
-		oos.writeObject(obj);
-		oos.flush();
-	}
-	
-}

Modified: branches/7.7.x/test-integration/perf/src/test/java/org/teiid/query/eval/TestEnginePerformance.java
===================================================================
--- branches/7.7.x/test-integration/perf/src/test/java/org/teiid/query/eval/TestEnginePerformance.java	2012-02-10 22:32:36 UTC (rev 3865)
+++ branches/7.7.x/test-integration/perf/src/test/java/org/teiid/query/eval/TestEnginePerformance.java	2012-02-13 15:18:20 UTC (rev 3866)
@@ -25,7 +25,13 @@
 import static org.junit.Assert.*;
 import static org.teiid.query.processor.TestProcessor.*;
 
+import java.io.ByteArrayInputStream;
 import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.sql.Date;
+import java.sql.Time;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -41,6 +47,7 @@
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.teiid.api.exception.query.QueryParserException;
+import org.teiid.client.BatchSerializer;
 import org.teiid.common.buffer.BlockedException;
 import org.teiid.common.buffer.BufferManager;
 import org.teiid.common.buffer.TupleBatch;
@@ -51,6 +58,7 @@
 import org.teiid.core.TeiidException;
 import org.teiid.core.TeiidProcessingException;
 import org.teiid.core.types.DataTypeManager;
+import org.teiid.core.util.AccessibleByteArrayOutputStream;
 import org.teiid.core.util.UnitTestUtil;
 import org.teiid.query.metadata.QueryMetadataInterface;
 import org.teiid.query.optimizer.capabilities.CapabilitiesFinder;
@@ -371,7 +379,71 @@
 	@Test public void runLike_16() throws Exception {
 		helpTestLike(50000, 16);
 	}
+	
+	@Test public void runBatchSerialization_String() throws Exception {
+		String[] types = new String[] {DataTypeManager.DefaultDataTypes.STRING};
+		int size = 1024;
+		
+		final List<List<?>> batch = new ArrayList<List<?>>();
+		for (int i = 0; i < size; i++) {
+			batch.add(Arrays.asList(String.valueOf(i)));
+		}
+		helpTestBatchSerialization(types, batch, 50000, 2);
+	}
+	
+	@Test public void runBatchSerialization_Time() throws Exception {
+		final String[] types = new String[] {DataTypeManager.DefaultDataTypes.TIME};
+		int size = 1024;
+		
+		final List<List<?>> batch = new ArrayList<List<?>>();
+		for (int i = 0; i < size; i++) {
+			batch.add(Arrays.asList(new Time(i)));
+		}
+		helpTestBatchSerialization(types, batch, 50000, 2);
+	}
+	
+	@Test public void runBatchSerialization_Date() throws Exception {
+		final String[] types = new String[] {DataTypeManager.DefaultDataTypes.DATE};
+		int size = 1024;
+		
+		final List<List<?>> batch = new ArrayList<List<?>>();
+		for (int i = 0; i < size; i++) {
+			batch.add(Arrays.asList(new Date(i)));
+		}
+		helpTestBatchSerialization(types, batch, 50000, 2);
+	}
+	
+	private void helpTestBatchSerialization(final String[] types,
+			final List<List<?>> batch, int iterations, int threadCount)
+			throws InterruptedException, Exception {
+		runTask(iterations, threadCount, new Task() {
+			
+			@Override
+			public Void call() throws Exception {
+				writeReadBatch(types, batch);
+				return null;
+			}
+		});
+	}
 
+	private List<List<Object>> writeReadBatch(String[] types, List<List<?>> batch)
+			throws IOException, ClassNotFoundException {
+		AccessibleByteArrayOutputStream baos = new AccessibleByteArrayOutputStream(5000);
+		ObjectOutputStream out = new ObjectOutputStream(baos);
+		BatchSerializer.writeBatch(out, types, batch);
+        out.flush();
+        
+        byte[] bytes = baos.getBuffer();
+        
+        ByteArrayInputStream bytesIn = new ByteArrayInputStream(bytes, 0, baos.getCount());
+        ObjectInputStream in = new ObjectInputStream(bytesIn);
+        List<List<Object>> newBatch = BatchSerializer.readBatch(in, types);
+        out.close();
+        in.close();
+        assertEquals(batch.size(), newBatch.size());
+        return newBatch;
+	}
+
 	private void helpTestLike(int iterations, int threads) throws QueryParserException,
 			InterruptedException, Exception {
 		final Expression ex = QueryParser.getQueryParser().parseExpression("'abcdefg' like 'a%g'");



More information about the teiid-commits mailing list