Author: shawkins
Date: 2011-06-22 16:23:26 -0400 (Wed, 22 Jun 2011)
New Revision: 3257
Modified:
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
Log:
TEIID-1647 adding synchronization to prevent lob exception
Modified:
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java
===================================================================
---
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2011-06-18
12:21:26 UTC (rev 3256)
+++
branches/7.4.x/engine/src/main/java/org/teiid/dqp/internal/process/RequestWorkItem.java 2011-06-22
20:23:26 UTC (rev 3257)
@@ -445,15 +445,17 @@
doneProducingBatches();
}
addToCache();
- add = sendResultsIfNeeded(batch);
- if (!added) {
- super.flushBatchDirect(batch, add);
- //restrict the buffer size for forward only results
- if (add && !processor.hasFinalBuffer()
- && !batch.getTerminationFlag()
- && this.getTupleBuffer().getManagedRowCount() >= 20 *
this.getTupleBuffer().getBatchSize()) {
- //requestMore will trigger more processing
- throw BlockedException.block(requestID, "Blocking due to full results
buffer."); //$NON-NLS-1$
+ synchronized (lobStreams) {
+ add = sendResultsIfNeeded(batch);
+ if (!added) {
+ super.flushBatchDirect(batch, add);
+ //restrict the buffer size for forward only results
+ if (add && !processor.hasFinalBuffer()
+ && !batch.getTerminationFlag()
+ && this.getTupleBuffer().getManagedRowCount() >= 20 *
this.getTupleBuffer().getBatchSize()) {
+ //requestMore will trigger more processing
+ throw BlockedException.block(requestID, "Blocking due to full results
buffer."); //$NON-NLS-1$
+ }
}
}
}
@@ -668,10 +670,10 @@
public void processLobChunkRequest(String id, int streamRequestId,
ResultsReceiver<LobChunk> chunckReceiver) {
LobWorkItem workItem = null;
synchronized (lobStreams) {
- workItem = this.lobStreams.get(new Integer(streamRequestId));
+ workItem = this.lobStreams.get(streamRequestId);
if (workItem == null) {
workItem = new LobWorkItem(this, dqpCore, id, streamRequestId);
- lobStreams.put(new Integer(streamRequestId), workItem);
+ lobStreams.put(streamRequestId, workItem);
}
}
workItem.setResultsReceiver(chunckReceiver);
@@ -683,7 +685,7 @@
}
public void removeLobStream(int streamRequestId) {
- this.lobStreams.remove(new Integer(streamRequestId));
+ this.lobStreams.remove(streamRequestId);
}
public boolean requestCancel() throws TeiidComponentException {
Modified:
branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java
===================================================================
---
branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2011-06-18
12:21:26 UTC (rev 3256)
+++
branches/7.4.x/engine/src/test/java/org/teiid/dqp/internal/process/TestDQPCore.java 2011-06-22
20:23:26 UTC (rev 3257)
@@ -41,9 +41,13 @@
import org.teiid.client.RequestMessage;
import org.teiid.client.ResultsMessage;
import org.teiid.client.RequestMessage.StatementType;
+import org.teiid.client.lob.LobChunk;
+import org.teiid.client.util.ResultsFuture;
import org.teiid.common.buffer.BufferManager;
import org.teiid.common.buffer.BufferManagerFactory;
import org.teiid.common.buffer.impl.BufferManagerImpl;
+import org.teiid.core.TeiidProcessingException;
+import org.teiid.core.types.BlobType;
import org.teiid.dqp.internal.datamgr.ConnectorManagerRepository;
import org.teiid.dqp.internal.datamgr.FakeTransactionService;
import org.teiid.dqp.internal.process.AbstractWorkItem.ThreadState;
@@ -57,7 +61,41 @@
@SuppressWarnings("nls")
public class TestDQPCore {
- private DQPCore core;
+ private final class LobThread extends Thread {
+ BlobType bt;
+ private final RequestMessage reqMsg;
+ volatile ResultsFuture<LobChunk> chunkFuture;
+ protected DQPWorkContext workContext;
+
+ private LobThread(RequestMessage reqMsg) {
+ this.reqMsg = reqMsg;
+ }
+
+ @Override
+ public void run() {
+ synchronized (this) {
+ while (workContext == null) {
+ try {
+ this.wait();
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ workContext.runInContext(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ chunkFuture = core.requestNextLobChunk(1, reqMsg.getExecutionId(),
bt.getReferenceStreamId());
+ } catch (TeiidProcessingException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+ }
+
+ private DQPCore core;
private DQPConfiguration config;
private AutoGenDataService agds;
@@ -411,6 +449,34 @@
assertEquals(1, this.core.getRsCache().getCacheHitCount());
}
+ @Test public void testLobConcurrency() throws Exception {
+ RequestMessage reqMsg = exampleRequestMessage("select to_bytes(stringkey,
'utf-8') FROM BQT1.SmallA");
+ reqMsg.setTxnAutoWrapMode(RequestMessage.TXN_WRAP_OFF);
+ agds.setSleep(100);
+ ResultsFuture<ResultsMessage> message =
core.executeRequest(reqMsg.getExecutionId(), reqMsg);
+ final LobThread t = new LobThread(reqMsg);
+ t.start();
+ message.addCompletionListener(new
ResultsFuture.CompletionListener<ResultsMessage>() {
+ @Override
+ public void onCompletion(ResultsFuture<ResultsMessage> future) {
+ try {
+ final BlobType bt = (BlobType)future.get().getResults()[0].get(0);
+ t.bt = bt;
+ t.workContext = DQPWorkContext.getWorkContext();
+ synchronized (t) {
+ t.notify();
+ }
+ Thread.sleep(100); //give the Thread a chance to run
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ message.get();
+ t.join();
+ assertNotNull(t.chunkFuture.get().getBytes());
+ }
+
public void helpTestVisibilityFails(String sql) throws Exception {
RequestMessage reqMsg = exampleRequestMessage(sql);
reqMsg.setTxnAutoWrapMode(RequestMessage.TXN_WRAP_OFF);