[teiid-commits] teiid SVN: r3257 - in branches/7.4.x/engine/src: test/java/org/teiid/dqp/internal/process and 1 other directory.

teiid-commits at lists.jboss.org teiid-commits at lists.jboss.org
Wed Jun 22 16:23:26 EDT 2011


Author: shawkins
Date: 2011-06-22 16:23:26 -0400 (Wed, 22 Jun 2011)
New Revision: 3257

Modified:
   branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
   branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
Log:
TEIID-1647 adding synchronization to prevent lob exception

Modified: branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
--- branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2011-06-18 12:21:26 UTC (rev 3256)
+++ branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java	2011-06-22 20:23:26 UTC (rev 3257)
@@ -445,15 +445,17 @@
 					doneProducingBatches();
 				}
 				addToCache();
-				add = sendResultsIfNeeded(batch);
-				if (!added) {
-					super.flushBatchDirect(batch, add);
-					//restrict the buffer size for forward only results
-					if (add && !processor.hasFinalBuffer()
-							&& !batch.getTerminationFlag() 
-							&& this.getTupleBuffer().getManagedRowCount() >= 20 * this.getTupleBuffer().getBatchSize()) {
-						//requestMore will trigger more processing
-						throw BlockedException.block(requestID, "Blocking due to full results buffer."); //$NON-NLS-1$
+				synchronized (lobStreams) {
+					add = sendResultsIfNeeded(batch);
+					if (!added) {
+						super.flushBatchDirect(batch, add);
+						//restrict the buffer size for forward only results
+						if (add && !processor.hasFinalBuffer()
+								&& !batch.getTerminationFlag() 
+								&& this.getTupleBuffer().getManagedRowCount() >= 20 * this.getTupleBuffer().getBatchSize()) {
+							//requestMore will trigger more processing
+							throw BlockedException.block(requestID, "Blocking due to full results buffer."); //$NON-NLS-1$
+						}
 					}
 				}
 			}
@@ -668,10 +670,10 @@
     public void processLobChunkRequest(String id, int streamRequestId, ResultsReceiver<LobChunk> chunckReceiver) {
     	LobWorkItem workItem = null;
     	synchronized (lobStreams) {
-            workItem = this.lobStreams.get(new Integer(streamRequestId));
+            workItem = this.lobStreams.get(streamRequestId);
             if (workItem == null) {
             	workItem = new LobWorkItem(this, dqpCore, id, streamRequestId);
-            	lobStreams.put(new Integer(streamRequestId), workItem);
+            	lobStreams.put(streamRequestId, workItem);
             }
 		}
     	workItem.setResultsReceiver(chunckReceiver);
@@ -683,7 +685,7 @@
     }
     
     public void removeLobStream(int streamRequestId) {
-        this.lobStreams.remove(new Integer(streamRequestId));
+        this.lobStreams.remove(streamRequestId);
     } 
     
     public boolean requestCancel() throws TeiidComponentException {

Modified: branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
--- branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java	2011-06-18 12:21:26 UTC (rev 3256)
+++ branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java	2011-06-22 20:23:26 UTC (rev 3257)
@@ -41,9 +41,13 @@
 import org.teiid.client.RequestMessage;
 import org.teiid.client.ResultsMessage;
 import org.teiid.client.RequestMessage.StatementType;
+import org.teiid.client.lob.LobChunk;
+import org.teiid.client.util.ResultsFuture;
 import org.teiid.common.buffer.BufferManager;
 import org.teiid.common.buffer.BufferManagerFactory;
 import org.teiid.common.buffer.impl.BufferManagerImpl;
+import org.teiid.core.TeiidProcessingException;
+import org.teiid.core.types.BlobType;
 import org.teiid.dqp.internal.datamgr.ConnectorManagerRepository;
 import org.teiid.dqp.internal.datamgr.FakeTransactionService;
 import org.teiid.dqp.internal.process.AbstractWorkItem.ThreadState;
@@ -57,7 +61,41 @@
 @SuppressWarnings("nls")
 public class TestDQPCore {
 
-    private DQPCore core;
+    private final class LobThread extends Thread {
+		BlobType bt;
+		private final RequestMessage reqMsg;
+		volatile ResultsFuture<LobChunk> chunkFuture;
+		protected DQPWorkContext workContext;
+
+		private LobThread(RequestMessage reqMsg) {
+			this.reqMsg = reqMsg;
+		}
+		
+		@Override
+		public void run() {
+			synchronized (this) {
+				while (workContext == null) {
+					try {
+						this.wait();
+					} catch (InterruptedException e) {
+					}
+				}
+			}
+			workContext.runInContext(new Runnable() {
+				
+				@Override
+				public void run() {
+					try {
+						chunkFuture = core.requestNextLobChunk(1, reqMsg.getExecutionId(), bt.getReferenceStreamId());
+					} catch (TeiidProcessingException e) {
+						e.printStackTrace();
+					}
+				}
+			});
+		}
+	}
+
+	private DQPCore core;
     private DQPConfiguration config;
     private AutoGenDataService agds;
 
@@ -411,6 +449,34 @@
         assertEquals(1, this.core.getRsCache().getCacheHitCount());
     }
     
+    @Test public void testLobConcurrency() throws Exception {
+    	RequestMessage reqMsg = exampleRequestMessage("select to_bytes(stringkey, 'utf-8') FROM BQT1.SmallA"); 
+        reqMsg.setTxnAutoWrapMode(RequestMessage.TXN_WRAP_OFF);
+        agds.setSleep(100);
+        ResultsFuture<ResultsMessage> message = core.executeRequest(reqMsg.getExecutionId(), reqMsg);
+        final LobThread t = new LobThread(reqMsg);
+        t.start();
+        message.addCompletionListener(new ResultsFuture.CompletionListener<ResultsMessage>() {
+        	@Override
+        	public void onCompletion(ResultsFuture<ResultsMessage> future) {
+        		try {
+        			final BlobType bt = (BlobType)future.get().getResults()[0].get(0);
+        			t.bt = bt;
+        			t.workContext = DQPWorkContext.getWorkContext();
+        			synchronized (t) {
+            			t.notify();
+					}
+        			Thread.sleep(100); //give the Thread a chance to run
+				} catch (Exception e) {
+					throw new RuntimeException(e);
+				}
+        	}
+		});
+        message.get();
+        t.join();
+        assertNotNull(t.chunkFuture.get().getBytes());
+    }
+    
 	public void helpTestVisibilityFails(String sql) throws Exception {
         RequestMessage reqMsg = exampleRequestMessage(sql); 
         reqMsg.setTxnAutoWrapMode(RequestMessage.TXN_WRAP_OFF);



More information about the teiid-commits mailing list