Author: clebert.suconic(a)jboss.com
Date: 2009-11-18 16:29:15 -0500 (Wed, 18 Nov 2009)
New Revision: 8316
Added:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/OperationContext.java
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
Removed:
branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationContext.java
branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationExceptionCallback.java
branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/OperationContextImpl.java
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
Tweaks
Deleted:
branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationContext.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationContext.java 2009-11-18
20:44:12 UTC (rev 8315)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationContext.java 2009-11-18
21:29:15 UTC (rev 8316)
@@ -1,49 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.completion;
-
-import org.hornetq.core.journal.IOAsyncTask;
-import org.hornetq.core.journal.IOCompletion;
-
-
-/**
- * This represents a set of operations done as part of replication.
- * When the entire set is done a group of Runnables can be executed.
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public interface OperationContext extends IOCompletion
-{
-
- boolean hasData();
-
- void executeOnCompletion(IOAsyncTask runnable);
-
- /** To be called when there are no more operations pending */
- void complete();
-
- /** Flush all pending callbacks on the Context */
- void flush();
-
- /** Replication may need some extra controls to guarantee ordering
- * when nothing is persisted through the contexts
- * @return The context is empty
- */
- boolean isEmpty();
-
- void setEmpty(boolean empty);
-
-}
Deleted:
branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationExceptionCallback.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationExceptionCallback.java 2009-11-18
20:44:12 UTC (rev 8315)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationExceptionCallback.java 2009-11-18
21:29:15 UTC (rev 8316)
@@ -1,26 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.completion;
-
-/**
- * A OperationExceptionCallback
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public interface OperationExceptionCallback
-{
- void onError(int errorCode, String errorMessage);
-}
Deleted:
branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/OperationContextImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/OperationContextImpl.java 2009-11-18
20:44:12 UTC (rev 8315)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/OperationContextImpl.java 2009-11-18
21:29:15 UTC (rev 8316)
@@ -1,155 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.completion.impl;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import org.hornetq.core.completion.OperationContext;
-import org.hornetq.core.journal.IOAsyncTask;
-
-/**
- * A ReplicationToken
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- * TODO: Maybe I should move this to persistence.journal. I need to check a few
dependencies first.
- *
- */
-public class OperationContextImpl implements OperationContext
-{
- private static final ThreadLocal<OperationContext> tlContext = new
ThreadLocal<OperationContext>();
-
- public static OperationContext getContext()
- {
- OperationContext token = tlContext.get();
- if (token == null)
- {
- token = new OperationContextImpl();
- tlContext.set(token);
- }
- return token;
- }
-
- private List<IOAsyncTask> tasks;
-
- private int linedup = 0;
-
- private int replicated = 0;
-
- private boolean empty = false;
-
- private volatile boolean complete = false;
-
- /**
- * @param executor
- */
- public OperationContextImpl()
- {
- super();
- }
-
- /** To be called by the replication manager, when new replication is added to the
queue */
- public void linedUp()
- {
- linedup++;
- }
-
- public boolean hasData()
- {
- return linedup > 0;
- }
-
- /** You may have several actions to be done after a replication operation is
completed. */
- public void executeOnCompletion(IOAsyncTask completion)
- {
- if (complete)
- {
- // Sanity check, this shouldn't happen
- throw new IllegalStateException("The Replication Context is complete, and
no more tasks are accepted");
- }
-
- if (tasks == null)
- {
- // No need to use Concurrent, we only add from a single thread.
- // We don't add any more Runnables after it is complete
- tasks = new LinkedList<IOAsyncTask>();
- }
-
- tasks.add(completion);
- }
-
- /** To be called by the replication manager, when data is confirmed on the channel */
- public synchronized void done()
- {
- if (++replicated == linedup && complete)
- {
- flush();
- }
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationToken#complete()
- */
- public synchronized void complete()
- {
- tlContext.set(null);
- complete = true;
- if (replicated == linedup && complete)
- {
- flush();
- }
- }
-
- public synchronized void flush()
- {
- if (tasks != null)
- {
- for (IOAsyncTask run : tasks)
- {
- run.done();
- }
- tasks.clear();
- }
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationContext#isRoundtrip()
- */
- public boolean isEmpty()
- {
- return empty;
- }
-
- public void setEmpty(final boolean sync)
- {
- this.empty = sync;
- }
-
-
- /* (non-Javadoc)
- * @see org.hornetq.core.asyncio.AIOCallback#onError(int, java.lang.String)
- */
- public void onError(int errorCode, String errorMessage)
- {
- if (tasks != null)
- {
- for (IOAsyncTask run : tasks)
- {
- run.onError(errorCode, errorMessage);
- }
- }
- }
-
-}
Copied:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/OperationContext.java
(from rev 8315,
branches/ClebertTemporary/src/main/org/hornetq/core/completion/OperationContext.java)
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/persistence/OperationContext.java
(rev 0)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/OperationContext.java 2009-11-18
21:29:15 UTC (rev 8316)
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.persistence;
+
+import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.IOCompletion;
+
+
+/**
+ * This represents a set of operations done as part of replication.
+ * When the entire set is done a group of Runnables can be executed.
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public interface OperationContext extends IOCompletion
+{
+
+ boolean hasData();
+
+ void executeOnCompletion(IOAsyncTask runnable);
+
+ /** To be called when there are no more operations pending */
+ void complete();
+
+ /** Flush all pending callbacks on the Context */
+ void flush();
+
+ /** Replication may need some extra controls to guarantee ordering
+ * when nothing is persisted through the contexts
+ * @return The context is empty
+ */
+ boolean isEmpty();
+
+ void setEmpty(boolean empty);
+
+}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-18
20:44:12 UTC (rev 8315)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-18
21:29:15 UTC (rev 8316)
@@ -31,7 +31,6 @@
import javax.transaction.xa.Xid;
import org.hornetq.core.buffers.ChannelBuffers;
-import org.hornetq.core.completion.impl.OperationContextImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
Copied:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
(from rev 8315,
branches/ClebertTemporary/src/main/org/hornetq/core/completion/impl/OperationContextImpl.java)
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
(rev 0)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-11-18
21:29:15 UTC (rev 8316)
@@ -0,0 +1,155 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.persistence.impl.journal;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.persistence.OperationContext;
+
+/**
+ * A ReplicationToken
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ * TODO: Maybe I should move this to persistence.journal. I need to check a few
dependencies first.
+ *
+ */
+public class OperationContextImpl implements OperationContext
+{
+ private static final ThreadLocal<OperationContext> tlContext = new
ThreadLocal<OperationContext>();
+
+ public static OperationContext getContext()
+ {
+ OperationContext token = tlContext.get();
+ if (token == null)
+ {
+ token = new OperationContextImpl();
+ tlContext.set(token);
+ }
+ return token;
+ }
+
+ private List<IOAsyncTask> tasks;
+
+ private int linedup = 0;
+
+ private int replicated = 0;
+
+ private boolean empty = false;
+
+ private volatile boolean complete = false;
+
+ /**
+ * @param executor
+ */
+ public OperationContextImpl()
+ {
+ super();
+ }
+
+ /** To be called by the replication manager, when new replication is added to the
queue */
+ public void linedUp()
+ {
+ linedup++;
+ }
+
+ public boolean hasData()
+ {
+ return linedup > 0;
+ }
+
+ /** You may have several actions to be done after a replication operation is
completed. */
+ public void executeOnCompletion(IOAsyncTask completion)
+ {
+ if (complete)
+ {
+ // Sanity check, this shouldn't happen
+ throw new IllegalStateException("The Replication Context is complete, and
no more tasks are accepted");
+ }
+
+ if (tasks == null)
+ {
+ // No need to use Concurrent, we only add from a single thread.
+ // We don't add any more Runnables after it is complete
+ tasks = new LinkedList<IOAsyncTask>();
+ }
+
+ tasks.add(completion);
+ }
+
+ /** To be called by the replication manager, when data is confirmed on the channel */
+ public synchronized void done()
+ {
+ if (++replicated == linedup && complete)
+ {
+ flush();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationToken#complete()
+ */
+ public synchronized void complete()
+ {
+ tlContext.set(null);
+ complete = true;
+ if (replicated == linedup && complete)
+ {
+ flush();
+ }
+ }
+
+ public synchronized void flush()
+ {
+ if (tasks != null)
+ {
+ for (IOAsyncTask run : tasks)
+ {
+ run.done();
+ }
+ tasks.clear();
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationContext#isRoundtrip()
+ */
+ public boolean isEmpty()
+ {
+ return empty;
+ }
+
+ public void setEmpty(final boolean sync)
+ {
+ this.empty = sync;
+ }
+
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.asyncio.AIOCallback#onError(int, java.lang.String)
+ */
+ public void onError(int errorCode, String errorMessage)
+ {
+ if (tasks != null)
+ {
+ for (IOAsyncTask run : tasks)
+ {
+ run.onError(errorCode, errorMessage);
+ }
+ }
+ }
+
+}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-18
20:44:12 UTC (rev 8315)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-18
21:29:15 UTC (rev 8316)
@@ -15,11 +15,11 @@
import java.util.Set;
-import org.hornetq.core.completion.OperationContext;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.utils.SimpleString;
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-18
20:44:12 UTC (rev 8315)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-18
21:29:15 UTC (rev 8316)
@@ -22,13 +22,13 @@
import org.hornetq.core.client.SessionFailureListener;
import org.hornetq.core.client.impl.FailoverManager;
-import org.hornetq.core.completion.OperationContext;
-import org.hornetq.core.completion.impl.OperationContextImpl;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.persistence.OperationContext;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.remoting.Packet;
Modified:
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
---
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-18
20:44:12 UTC (rev 8315)
+++
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-18
21:29:15 UTC (rev 8316)
@@ -32,7 +32,6 @@
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.FailoverManager;
import org.hornetq.core.client.impl.FailoverManagerImpl;
-import org.hornetq.core.completion.impl.OperationContextImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -53,6 +52,7 @@
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.remoting.Interceptor;
import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.RemotingConnection;