[teiid-commits] teiid SVN: r3452 - in trunk/engine/src: main/java/org/teiid/common/buffer/impl and 3 other directories.
teiid-commits at lists.jboss.org
teiid-commits at lists.jboss.org
Tue Sep 6 16:08:59 EDT 2011
Author: shawkins
Date: 2011-09-06 16:08:58 -0400 (Tue, 06 Sep 2011)
New Revision: 3452
Modified:
trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java
trunk/engine/src/main/java/org/teiid/common/buffer/STree.java
trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java
trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java
trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java
trunk/engine/src/main/resources/org/teiid/query/i18n.properties
trunk/engine/src/test/java/org/teiid/query/processor/TestTempTables.java
Log:
TEIID-942 TEIID-1742 refining transaction support, fixing ids to use longs.
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java 2011-09-05 14:27:44 UTC (rev 3451)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/SPage.java 2011-09-06 20:08:58 UTC (rev 3452)
@@ -31,6 +31,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
import org.teiid.common.buffer.BatchManager.CleanupHook;
import org.teiid.common.buffer.BatchManager.ManagedBatch;
@@ -83,25 +84,25 @@
}
}
}
-
+
+ private static AtomicLong counter = new AtomicLong();
+
STree stree;
- private int id;
+ private long id;
protected SPage next;
protected SPage prev;
protected ManagedBatch managedBatch;
private Object trackingObject;
protected TupleBatch values;
protected ArrayList<SPage> children;
- //TODO: could track cloning more completely, which would allow for earlier batch removal
- private boolean cloned;
+ protected boolean cloned;
SPage(STree stree, boolean leaf) {
this.stree = stree;
- this.id = stree.counter.getAndIncrement();
+ this.id = counter.getAndIncrement();
stree.pages.put(this.id, this);
- //TODO: this counter is a hack. need a better idea of a storage id
- this.values = new TupleBatch(id, new ArrayList(stree.pageSize/4));
+ this.values = new TupleBatch(0, new ArrayList(stree.pageSize/4));
if (!leaf) {
children = new ArrayList<SPage>(stree.pageSize/4);
}
@@ -121,7 +122,7 @@
clone.children = new ArrayList<SPage>(children);
}
if (values != null) {
- clone.values = new TupleBatch(stree.counter.getAndIncrement(), new ArrayList<List<?>>(values.getTuples()));
+ clone.values = new TupleBatch(0, new ArrayList<List<?>>(values.getTuples()));
}
return clone;
} catch (CloneNotSupportedException e) {
@@ -129,7 +130,7 @@
}
}
- public int getId() {
+ public long getId() {
return id;
}
@@ -196,7 +197,6 @@
values.setDataTypes(stree.types);
}
if (cloned) {
- values.setRowOffset(stree.counter.getAndIncrement());
cloned = false;
trackingObject = null;
}
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/STree.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/STree.java 2011-09-05 14:27:44 UTC (rev 3451)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/STree.java 2011-09-06 20:08:58 UTC (rev 3452)
@@ -57,8 +57,7 @@
private int mask = 1;
private int shift = 1;
- protected AtomicInteger counter = new AtomicInteger();
- protected ConcurrentHashMap<Integer, SPage> pages = new ConcurrentHashMap<Integer, SPage>();
+ protected ConcurrentHashMap<Long, SPage> pages = new ConcurrentHashMap<Long, SPage>();
protected volatile SPage[] header = new SPage[] {new SPage(this, true)};
protected BatchManager keyManager;
protected BatchManager leafManager;
@@ -103,12 +102,12 @@
clone.updateLock = new ReentrantLock();
clone.rowCount = new AtomicInteger(rowCount.get());
//clone the pages
- clone.pages = new ConcurrentHashMap<Integer, SPage>(pages);
- for (Map.Entry<Integer, SPage> entry : clone.pages.entrySet()) {
+ clone.pages = new ConcurrentHashMap<Long, SPage>(pages);
+ for (Map.Entry<Long, SPage> entry : clone.pages.entrySet()) {
entry.setValue(entry.getValue().clone(clone));
}
//reset the pointers
- for (Map.Entry<Integer, SPage> entry : clone.pages.entrySet()) {
+ for (Map.Entry<Long, SPage> entry : clone.pages.entrySet()) {
SPage clonePage = entry.getValue();
clonePage.next = clone.getPage(clonePage.next);
clonePage.prev = clone.getPage(clonePage.prev);
@@ -447,7 +446,7 @@
}
public void remove() {
- truncate(false);
+ truncate(true);
this.keyManager.remove();
this.leafManager.remove();
}
@@ -530,4 +529,12 @@
this.comparator.setSortParameters(sortParameters);
}
+ public void clearClonedFlags() {
+ for (SPage page : pages.values()) {
+ page.cloned = false;
+ //we don't really care about using synchronization or a volatile here
+ //since the worst case is that we'll just use gc cleanup
+ }
+ }
+
}
\ No newline at end of file
Modified: trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-09-05 14:27:44 UTC (rev 3451)
+++ trunk/engine/src/main/java/org/teiid/common/buffer/impl/BufferManagerImpl.java 2011-09-06 20:08:58 UTC (rev 3452)
@@ -95,12 +95,10 @@
private final class CleanupHook implements org.teiid.common.buffer.BatchManager.CleanupHook {
private long id;
- private int beginRow;
private WeakReference<BatchManagerImpl> ref;
- CleanupHook(long id, int beginRow, BatchManagerImpl batchManager) {
+ CleanupHook(long id, BatchManagerImpl batchManager) {
this.id = id;
- this.beginRow = beginRow;
this.ref = new WeakReference<BatchManagerImpl>(batchManager);
}
@@ -109,7 +107,7 @@
if (batchManager == null) {
return;
}
- cleanupManagedBatch(batchManager, beginRow, id);
+ cleanupManagedBatch(batchManager, id);
}
}
@@ -204,10 +202,10 @@
* Holder for active batches
*/
private class TupleBufferInfo {
- TreeMap<Integer, ManagedBatchImpl> batches = new TreeMap<Integer, ManagedBatchImpl>();
- Integer lastUsed = null;
+ TreeMap<Long, ManagedBatchImpl> batches = new TreeMap<Long, ManagedBatchImpl>();
+ Long lastUsed = null;
- ManagedBatchImpl removeBatch(int row) {
+ ManagedBatchImpl removeBatch(long row) {
ManagedBatchImpl result = batches.remove(row);
if (result != null) {
activeBatchKB -= result.sizeEstimate;
@@ -268,7 +266,7 @@
if (update) {
activeBatches.put(batchManager.id, tbi);
}
- Assertion.isNull(tbi.batches.put(this.beginRow, this));
+ tbi.batches.put(this.id, this);
}
}
@@ -283,13 +281,13 @@
if (tbi != null) {
boolean put = true;
if (!cache) {
- tbi.removeBatch(this.beginRow);
+ tbi.removeBatch(this.id);
if (tbi.batches.isEmpty()) {
put = false;
}
}
if (put) {
- tbi.lastUsed = this.beginRow;
+ tbi.lastUsed = this.id;
activeBatches.put(batchManager.id, tbi);
}
}
@@ -397,17 +395,17 @@
}
public void remove() {
- cleanupManagedBatch(batchManager, beginRow, id);
+ cleanupManagedBatch(batchManager, id);
}
@Override
public CleanupHook getCleanupHook() {
- return new CleanupHook(id, beginRow, batchManager);
+ return new CleanupHook(id, batchManager);
}
@Override
public String toString() {
- return "ManagedBatch " + batchManager.id + " " + this.beginRow + " " + activeBatch; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
+ return "ManagedBatch " + batchManager.id + " " + this.id + " " + activeBatch; //$NON-NLS-1$ //$NON-NLS-2$ //$NON-NLS-3$
}
}
@@ -513,10 +511,10 @@
return tupleBuffer;
}
- private void cleanupManagedBatch(BatchManagerImpl batchManager, int beginRow, long id) {
+ private void cleanupManagedBatch(BatchManagerImpl batchManager, long id) {
synchronized (activeBatches) {
TupleBufferInfo tbi = activeBatches.get(batchManager.id);
- if (tbi != null && tbi.removeBatch(beginRow) != null) {
+ if (tbi != null && tbi.removeBatch(id) != null) {
if (tbi.batches.isEmpty()) {
activeBatches.remove(batchManager.id);
}
@@ -655,7 +653,7 @@
}
Iterator<TupleBufferInfo> iter = activeBatches.values().iterator();
TupleBufferInfo tbi = iter.next();
- Map.Entry<Integer, ManagedBatchImpl> entry = null;
+ Map.Entry<Long, ManagedBatchImpl> entry = null;
if (tbi.lastUsed != null) {
entry = tbi.batches.floorEntry(tbi.lastUsed - 1);
}
Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java 2011-09-05 14:27:44 UTC (rev 3451)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/GlobalTableStoreImpl.java 2011-09-06 20:08:58 UTC (rev 3452)
@@ -290,7 +290,7 @@
if (tempTable != null) {
TempMetadataID id = tableStore.getMetadataStore().getTempGroupID(matTableName);
synchronized (id) {
- boolean clone = tempTable.getActiveReaders().get() != 0;
+ boolean clone = tempTable.getActive().get() != 0;
if (clone) {
tempTable = tempTable.clone();
}
Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java 2011-09-05 14:27:44 UTC (rev 3451)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/TempTable.java 2011-09-06 20:08:58 UTC (rev 3452)
@@ -36,6 +36,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.teiid.api.exception.query.ExpressionEvaluationException;
@@ -275,9 +276,9 @@
}
}
- private static AtomicInteger ID_GENERATOR = new AtomicInteger();
+ private static AtomicLong ID_GENERATOR = new AtomicLong();
- private int id = ID_GENERATOR.getAndIncrement();
+ private Long id = ID_GENERATOR.getAndIncrement();
private STree tree;
private AtomicInteger rowId;
private List<ElementSymbol> columns;
@@ -361,7 +362,7 @@
}
}
- public AtomicInteger getActiveReaders() {
+ public AtomicInteger getActive() {
return activeReaders;
}
@@ -803,13 +804,13 @@
return tid;
}
- public int getId() {
+ public Long getId() {
return id;
}
@Override
public int hashCode() {
- return id;
+ return id.hashCode();
}
@Override
@@ -821,7 +822,7 @@
return false;
}
TempTable other = (TempTable)obj;
- return id == other.id;
+ return id.equals(other.id);
}
}
\ No newline at end of file
Modified: trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java
===================================================================
--- trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java 2011-09-05 14:27:44 UTC (rev 3451)
+++ trunk/engine/src/main/java/org/teiid/query/tempdata/TempTableStore.java 2011-09-06 20:08:58 UTC (rev 3452)
@@ -82,7 +82,7 @@
public class TempTableSynchronization implements Synchronization {
private String id;
- Set<Integer> existingTables = new HashSet<Integer>();
+ Set<Long> existingTables = new HashSet<Long>();
ConcurrentHashMap<String, TempTable> tables = new ConcurrentHashMap<String, TempTable>();
private List<TransactionCallback> callbacks = new LinkedList<TransactionCallback>();
@@ -131,10 +131,21 @@
@Override
public synchronized void afterCompletion(int status) {
- //TODO: cleanup tables
completed = true;
synchronizations.remove(id);
- for (TransactionCallback callback : callbacks) {
+ if (transactionMode == TransactionMode.ISOLATE_READS) {
+ for (TempTable table : tables.values()) {
+ table.getActive().decrementAndGet();
+ }
+ } else {
+ HashSet<TempTable> current = new HashSet<TempTable>(tempTables.values());
+ current.retainAll(tables.values());
+ for (TempTable table : current) {
+ table.getActive().set(0);
+ table.getTree().clearClonedFlags();
+ }
+ }
+ for (TransactionCallback callback : callbacks) {
if (status == Status.STATUS_COMMITTED) {
callback.commit();
} else {
@@ -144,6 +155,10 @@
callbacks.clear();
}
+ public boolean isCompleted() {
+ return completed;
+ }
+
@Override
public void beforeCompletion() {
@@ -307,11 +322,21 @@
if (synch != null && synch.existingTables.contains(tempTable.getId())) {
TempTable result = synch.tables.get(tempTableID);
if (result == null) {
- synch.tables.put(tempTableID, tempTable.clone());
+ synchronized (synch) {
+ if (synch.isCompleted()) {
+ throw new AssertionError("Expected active transaction"); //$NON-NLS-1$
+ }
+ if (!tempTable.getActive().compareAndSet(0, 1)) {
+ throw new TeiidProcessingException(QueryPlugin.Util.getString("TempTableStore.pending_update", tempTableID)); //$NON-NLS-1$
+ }
+ synch.tables.put(tempTableID, tempTable.clone());
+ }
}
return tempTable;
}
- }
+ } else if (tempTable.getActive().get() != 0) {
+ throw new TeiidProcessingException(QueryPlugin.Util.getString("TempTableStore.pending_update", tempTableID)); //$NON-NLS-1$
+ }
}
} else if (transactionMode == TransactionMode.ISOLATE_READS) {
TransactionContext tc = context.getTransactionContext();
@@ -320,24 +345,13 @@
if (synch != null) {
TempTable result = synch.tables.get(tempTableID);
if (result == null) {
- synch.tables.put(tempTableID, tempTable);
result = tempTable;
- result.getActiveReaders().getAndIncrement();
- TransactionCallback callback = new TransactionCallback() {
-
- @Override
- public void rollback() {
- tempTable.getActiveReaders().getAndDecrement();
- }
-
- @Override
- public void commit() {
- tempTable.getActiveReaders().getAndDecrement();
- }
- };
- if (!synch.addCallback(callback)) {
- callback.rollback();
- }
+ synchronized (synch) {
+ if (!synch.isCompleted()) {
+ synch.tables.put(tempTableID, tempTable);
+ result.getActive().getAndIncrement();
+ }
+ }
}
return result;
}
Modified: trunk/engine/src/main/resources/org/teiid/query/i18n.properties
===================================================================
--- trunk/engine/src/main/resources/org/teiid/query/i18n.properties 2011-09-05 14:27:44 UTC (rev 3451)
+++ trunk/engine/src/main/resources/org/teiid/query/i18n.properties 2011-09-06 20:08:58 UTC (rev 3452)
@@ -791,6 +791,7 @@
RulePlanJoins.cantSatisfy=Join region with unsatisfied access patterns cannot be satisfied by the join criteria, Access patterns: {0}
TempTableStore.table_exist_error=Temporary table "{0}" already exists.
TempTableStore.table_doesnt_exist_error=Temporary table "{0}" does not exist.
+TempTableStore.pending_update=Table {0} is locked by pending transaction update.
XMLQueryPlanner.cannot_plan=Cannot create a query for MappingClass with user criteria {0}
XMLQueryPlanner.invalid_relationship=Conjunct "{0}" has no relationship with target context {1}.
Modified: trunk/engine/src/test/java/org/teiid/query/processor/TestTempTables.java
===================================================================
--- trunk/engine/src/test/java/org/teiid/query/processor/TestTempTables.java 2011-09-05 14:27:44 UTC (rev 3451)
+++ trunk/engine/src/test/java/org/teiid/query/processor/TestTempTables.java 2011-09-06 20:08:58 UTC (rev 3452)
@@ -135,6 +135,34 @@
execute("select * from x", new List[] {});
}
+ @Test public void testCommitExistingRemoved() throws Exception {
+ execute("create local temporary table x (e1 string, e2 integer)", new List[] {Arrays.asList(0)}); //$NON-NLS-1$
+ setupTransaction(Connection.TRANSACTION_SERIALIZABLE);
+ execute("drop table x", new List[] {Arrays.asList(0)}); //$NON-NLS-1$
+ synch.afterCompletion(Status.STATUS_COMMITTED);
+ try {
+ execute("select * from x", new List[] {});
+ fail();
+ } catch (Exception e) {
+
+ }
+ }
+
+ @Test public void testUpdateLock() throws Exception {
+ execute("create local temporary table x (e1 string, e2 integer)", new List[] {Arrays.asList(0)}); //$NON-NLS-1$
+ setupTransaction(Connection.TRANSACTION_SERIALIZABLE);
+ execute("insert into x (e2, e1) select e2, e1 from pm1.g1", new List[] {Arrays.asList(6)}); //$NON-NLS-1$
+ tc = null;
+ try {
+ execute("insert into x (e2, e1) select e2, e1 from pm1.g1", new List[] {Arrays.asList(6)}); //$NON-NLS-1$
+ fail();
+ } catch (Exception e) {
+
+ }
+ synch.afterCompletion(Status.STATUS_COMMITTED);
+ execute("insert into x (e2, e1) select e2, e1 from pm1.g1", new List[] {Arrays.asList(6)}); //$NON-NLS-1$
+ }
+
@Test public void testRollbackExisting1() throws Exception {
execute("create local temporary table x (e1 string, e2 integer)", new List[] {Arrays.asList(0)}); //$NON-NLS-1$
for (int i = 0; i < 86; i++) {
More information about the teiid-commits
mailing list