Author: manik.surtani(a)jboss.com
Date: 2009-02-13 09:43:12 -0500 (Fri, 13 Feb 2009)
New Revision: 7687
Removed:
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/factories/TransactionLogFactory.java
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/marshall/ExtendedResponse.java
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/marshall/RequestIgnoredResponse.java
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/transaction/TransactionLog.java
core/tags/3.0.3.CR1/src/test/java/org/jboss/cache/statetransfer/NonBlockingStateTransferTest.java
Modified:
core/tags/3.0.3.CR1/pom.xml
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/RPCManagerImpl.java
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/Version.java
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/config/Configuration.java
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/factories/ComponentRegistry.java
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/marshall/InactiveRegionAwareRpcDispatcher.java
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
Log:
Reverted to pre NBST state for releasing 3.0.3
Modified: core/tags/3.0.3.CR1/pom.xml
===================================================================
--- core/tags/3.0.3.CR1/pom.xml 2009-02-13 13:43:13 UTC (rev 7686)
+++ core/tags/3.0.3.CR1/pom.xml 2009-02-13 14:43:12 UTC (rev 7687)
@@ -2,504 +2,504 @@
<project
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <properties>
- <jbosscache-core-version>3.1.0-SNAPSHOT</jbosscache-core-version>
- <!-- By default only run tests in the "unit" group -->
- <defaultTestGroup>unit</defaultTestGroup>
- <!-- By default only generate Javadocs when we install the module. -->
- <javadocPhase>install</javadocPhase>
- </properties>
+ <modelVersion>4.0.0</modelVersion>
+ <properties>
+ <jbosscache-core-version>3.0.3.CR1</jbosscache-core-version>
+ <!-- By default only run tests in the "unit" group -->
+ <defaultTestGroup>unit</defaultTestGroup>
+ <!-- By default only generate Javadocs when we install the module. -->
+ <javadocPhase>install</javadocPhase>
+ </properties>
- <parent>
- <groupId>org.jboss.cache</groupId>
- <artifactId>jbosscache-common-parent</artifactId>
- <version>1.5</version>
- </parent>
- <groupId>org.jboss.cache</groupId>
- <artifactId>jbosscache-core</artifactId>
- <version>${jbosscache-core-version}</version>
- <name>JBoss Cache - Core Edition</name>
- <description>JBoss Cache - Core Edition</description>
- <url>http://www.jbosscache.org</url>
- <packaging>jar</packaging>
- <dependencies>
- <dependency>
- <groupId>jgroups</groupId>
- <artifactId>jgroups</artifactId>
- <version>2.6.7.GA</version>
- </dependency>
+ <parent>
+ <groupId>org.jboss.cache</groupId>
+ <artifactId>jbosscache-common-parent</artifactId>
+ <version>1.5</version>
+ </parent>
+ <groupId>org.jboss.cache</groupId>
+ <artifactId>jbosscache-core</artifactId>
+ <version>${jbosscache-core-version}</version>
+ <name>JBoss Cache - Core Edition</name>
+ <description>JBoss Cache - Core Edition</description>
+ <url>http://www.jbosscache.org</url>
+ <packaging>jar</packaging>
+ <dependencies>
+ <dependency>
+ <groupId>jgroups</groupId>
+ <artifactId>jgroups</artifactId>
+ <version>2.6.7.GA</version>
+ </dependency>
- <!--
- For the JTA 1.1 API; consuming projects can safely
- exclude this and replace with any valid source of this API, such as a Java
EE app server.
- -->
- <dependency>
- <groupId>javax.transaction</groupId>
- <artifactId>jta</artifactId>
- <version>1.1</version>
- </dependency>
+ <!--
+ For the JTA 1.1 API; consuming projects can safely
+ exclude this and replace with any valid source of this API, such as a Java EE
app server.
+ -->
+ <dependency>
+ <groupId>javax.transaction</groupId>
+ <artifactId>jta</artifactId>
+ <version>1.1</version>
+ </dependency>
- <dependency>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- <version>1.1.1</version>
- </dependency>
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ <version>1.1.1</version>
+ </dependency>
- <dependency>
- <groupId>org.jboss</groupId>
- <artifactId>jboss-common-core</artifactId>
- <version>2.2.10.GA</version>
- </dependency>
+ <dependency>
+ <groupId>org.jboss</groupId>
+ <artifactId>jboss-common-core</artifactId>
+ <version>2.2.10.GA</version>
+ </dependency>
- <!-- optional dependencies -->
- <dependency>
- <groupId>jdbm</groupId>
- <artifactId>jdbm</artifactId>
- <version>1.0</version>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>c3p0</groupId>
- <artifactId>c3p0</artifactId>
- <version>0.9.1.2</version>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>sleepycat</groupId>
- <artifactId>je</artifactId>
- <version>3.2.43</version>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>net.jcip</groupId>
- <artifactId>jcip-annotations</artifactId>
- <version>1.0</version>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>net.noderunner</groupId>
- <artifactId>amazon-s3</artifactId>
- <version>1.0.0.0</version>
- <optional>true</optional>
- </dependency>
+ <!-- optional dependencies -->
+ <dependency>
+ <groupId>jdbm</groupId>
+ <artifactId>jdbm</artifactId>
+ <version>1.0</version>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>c3p0</groupId>
+ <artifactId>c3p0</artifactId>
+ <version>0.9.1.2</version>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>sleepycat</groupId>
+ <artifactId>je</artifactId>
+ <version>3.2.43</version>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>net.jcip</groupId>
+ <artifactId>jcip-annotations</artifactId>
+ <version>1.0</version>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>net.noderunner</groupId>
+ <artifactId>amazon-s3</artifactId>
+ <version>1.0.0.0</version>
+ <optional>true</optional>
+ </dependency>
- <!-- test dependencies -->
- <dependency>
- <groupId>hsqldb</groupId>
- <artifactId>hsqldb</artifactId>
- <version>1.8.0.7</version>
- <scope>test</scope>
- </dependency>
+ <!-- test dependencies -->
+ <dependency>
+ <groupId>hsqldb</groupId>
+ <artifactId>hsqldb</artifactId>
+ <version>1.8.0.7</version>
+ <scope>test</scope>
+ </dependency>
- <dependency>
- <groupId>org.easymock</groupId>
- <artifactId>easymock</artifactId>
- <version>2.4</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>jboss.jbossts</groupId>
- <artifactId>jbossjta</artifactId>
- <version>4.4.0.GA</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>beanshell</groupId>
- <artifactId>bsh</artifactId>
- <version>2.0b4</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>net.noderunner</groupId>
- <artifactId>http</artifactId>
- <version>1.0</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>javax.servlet</groupId>
- <artifactId>servlet-api</artifactId>
- <version>2.5</version>
- <scope>test</scope>
- </dependency>
- <!-- 5.8 is needed for proper parallel test execution -->
- <dependency>
- <groupId>org.testng</groupId>
- <artifactId>testng</artifactId>
- <version>5.8</version>
- <scope>test</scope>
- <classifier>jdk15</classifier>
- </dependency>
- </dependencies>
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <version>2.4</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>jboss.jbossts</groupId>
+ <artifactId>jbossjta</artifactId>
+ <version>4.4.0.GA</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>beanshell</groupId>
+ <artifactId>bsh</artifactId>
+ <version>2.0b4</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>net.noderunner</groupId>
+ <artifactId>http</artifactId>
+ <version>1.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ <version>2.5</version>
+ <scope>test</scope>
+ </dependency>
+ <!-- 5.8 is needed for proper parallel test execution -->
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <version>5.8</version>
+ <scope>test</scope>
+ <classifier>jdk15</classifier>
+ </dependency>
+ </dependencies>
- <build>
- <plugins>
- <!-- ensure parallel test execution -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>2.4.3-JBOSS</version>
- <configuration>
- <parallel>tests</parallel>
- <threadCount>10</threadCount>
- <forkMode>none</forkMode>
- <systemProperties>
- <property>
- <name>jgroups.stack</name>
- <value>${protocol.stack}</value>
- </property>
- </systemProperties>
- <trimStackTrace>false</trimStackTrace>
- <properties>
- <property>
- <name>listener</name>
-
<value>org.jboss.cache.util.UnitTestTestNGListener</value>
- </property>
- </properties>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>2.2-beta-1</version>
- <executions>
- <execution>
- <id>assemble</id>
- <phase>install</phase>
+ <build>
+ <plugins>
+ <!-- ensure parallel test execution -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.4.3-JBOSS</version>
+ <configuration>
+ <parallel>tests</parallel>
+ <threadCount>10</threadCount>
+ <forkMode>none</forkMode>
+ <systemProperties>
+ <property>
+ <name>jgroups.stack</name>
+ <value>${protocol.stack}</value>
+ </property>
+ </systemProperties>
+ <trimStackTrace>false</trimStackTrace>
+ <properties>
+ <property>
+ <name>listener</name>
+
<value>org.jboss.cache.util.UnitTestTestNGListener</value>
+ </property>
+ </properties>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.2-beta-1</version>
+ <executions>
+ <execution>
+ <id>assemble</id>
+ <phase>install</phase>
+ <goals>
+ <goal>attached</goal>
+ </goals>
+ <configuration>
+ <descriptors>
+ <descriptor>assembly/bin.xml</descriptor>
+ <descriptor>assembly/doc.xml</descriptor>
+ <descriptor>assembly/all.xml</descriptor>
+ <descriptor>assembly/src.xml</descriptor>
+ </descriptors>
+
<finalName>${artifactId}-${jbosscache-core-version}</finalName>
+ <outputDirectory>target/distribution</outputDirectory>
+ <workDirectory>target/assembly/work</workDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifest>
+
<addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
+
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
+ <mainClass>org.jboss.cache.Version</mainClass>
+ </manifest>
+ </archive>
+ </configuration>
+ <executions>
+ <execution>
+ <id>build-test-jar</id>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ <configuration>
+ <archive>
+ <manifest>
+
<addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
+
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
+ </manifest>
+ </archive>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ <reporting>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-report-plugin</artifactId>
+ <version>2.4.3-JBOSS</version>
+ </plugin>
+ </plugins>
+ </reporting>
+
+ <!-- basic JBoss repository so that the common parent POM in jbosscache-support can
be found -->
+ <repositories>
+ <repository>
+ <id>repository.jboss.org</id>
+ <
url>http://repository.jboss.org/maven2</url>
+ </repository>
+ <repository>
+ <id>snapshots.jboss.org</id>
+ <
url>http://snapshots.jboss.org/maven2</url>
+ </repository>
+ <!-- For Amazon S3 artifacts -->
+ <repository>
+ <id>e-xml.sourceforge.net</id>
+ <
url>http://e-xml.sourceforge.net/maven2/repository</url>
+ </repository>
+ </repositories>
+
+ <profiles>
+ <profile>
+ <!-- This testMoreState generates Javadocs and the UserGuide, FAQs and
Tutorial in the "package" phase. -->
+ <id>Docs</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <properties>
+ <!-- override to generate javadocs in the "package" phase
-->
+ <javadocPhase>package</javadocPhase>
+ </properties>
+ <build>
+ <plugins>
+ <!-- the docbook generation plugin for the user guide -->
+ <plugin>
+ <groupId>org.jboss.maven.plugins</groupId>
+ <artifactId>maven-jdocbook-plugin</artifactId>
+ <version>2.0.0</version>
+ <extensions>true</extensions>
+ <dependencies>
+ <dependency>
+ <groupId>org.jboss</groupId>
+ <artifactId>jbossorg-docbook-xslt</artifactId>
+ <version>1.1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss</groupId>
+ <artifactId>jbossorg-jdocbook-style</artifactId>
+ <version>1.1.0</version>
+ <type>jdocbook-style</type>
+ </dependency>
+ </dependencies>
+ <executions>
+
+ <!-- The User Guide-->
+ <execution>
+ <id>userguide_en</id>
+ <phase>package</phase>
<goals>
- <goal>attached</goal>
+ <goal>resources</goal>
+ <goal>generate</goal>
</goals>
<configuration>
- <descriptors>
- <descriptor>assembly/bin.xml</descriptor>
- <descriptor>assembly/doc.xml</descriptor>
- <descriptor>assembly/all.xml</descriptor>
- <descriptor>assembly/src.xml</descriptor>
- </descriptors>
-
<finalName>${artifactId}-${jbosscache-core-version}</finalName>
-
<outputDirectory>target/distribution</outputDirectory>
-
<workDirectory>target/assembly/work</workDirectory>
+
<sourceDocumentName>master.xml</sourceDocumentName>
+
<sourceDirectory>${basedir}/src/main/docbook/userguide/en</sourceDirectory>
+ <imageResource>
+
<directory>${basedir}/src/main/docbook/images</directory>
+ </imageResource>
+ <cssResource>
+
<directory>${basedir}/src/main/docbook/css</directory>
+ </cssResource>
+
<targetDirectory>${basedir}/target/docbook/userguide_en</targetDirectory>
+ <formats>
+ <format>
+ <formatName>pdf</formatName>
+
<stylesheetResource>classpath:/xslt/org/jboss/pdf.xsl</stylesheetResource>
+ <finalName>userguide_en.pdf</finalName>
+ </format>
+ <format>
+ <formatName>html</formatName>
+
<stylesheetResource>classpath:/xslt/org/jboss/xhtml.xsl</stylesheetResource>
+ <finalName>index.html</finalName>
+ </format>
+ <format>
+ <formatName>html_single</formatName>
+
<stylesheetResource>classpath:/xslt/org/jboss/xhtml-single.xsl
+ </stylesheetResource>
+ <finalName>index.html</finalName>
+ </format>
+ </formats>
+ <options>
+ <xincludeSupported>false</xincludeSupported>
+ </options>
</configuration>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <configuration>
- <archive>
- <manifest>
-
<addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
-
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
- <mainClass>org.jboss.cache.Version</mainClass>
- </manifest>
- </archive>
- </configuration>
- <executions>
- <execution>
- <id>build-test-jar</id>
+ </execution>
+
+ <!-- The Tutorial -->
+ <execution>
+ <id>tutorial_en</id>
+ <phase>package</phase>
<goals>
- <goal>test-jar</goal>
+ <goal>resources</goal>
+ <goal>generate</goal>
</goals>
<configuration>
- <archive>
- <manifest>
-
<addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
-
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
- </manifest>
- </archive>
+
<sourceDocumentName>master.xml</sourceDocumentName>
+
<sourceDirectory>${basedir}/src/main/docbook/tutorial/en</sourceDirectory>
+ <imageResource>
+
<directory>${basedir}/src/main/docbook/images</directory>
+ </imageResource>
+ <cssResource>
+
<directory>${basedir}/src/main/docbook/css</directory>
+ </cssResource>
+
<targetDirectory>${basedir}/target/docbook/tutorial_en</targetDirectory>
+ <formats>
+ <format>
+ <formatName>pdf</formatName>
+
<stylesheetResource>classpath:/xslt/org/jboss/pdf.xsl</stylesheetResource>
+ <finalName>tutorial_en.pdf</finalName>
+ </format>
+ <format>
+ <formatName>html</formatName>
+
<stylesheetResource>classpath:/xslt/org/jboss/xhtml.xsl</stylesheetResource>
+ <finalName>index.html</finalName>
+ </format>
+ <format>
+ <formatName>html_single</formatName>
+
<stylesheetResource>classpath:/xslt/org/jboss/xhtml-single.xsl
+ </stylesheetResource>
+ <finalName>index.html</finalName>
+ </format>
+ </formats>
+ <options>
+ <xincludeSupported>false</xincludeSupported>
+ </options>
</configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- <reporting>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-report-plugin</artifactId>
- <version>2.4.3-JBOSS</version>
- </plugin>
- </plugins>
- </reporting>
+ </execution>
- <!-- basic JBoss repository so that the common parent POM in jbosscache-support
can be found -->
- <repositories>
- <repository>
- <id>repository.jboss.org</id>
- <
url>http://repository.jboss.org/maven2</url>
- </repository>
- <repository>
- <id>snapshots.jboss.org</id>
- <
url>http://snapshots.jboss.org/maven2</url>
- </repository>
- <!-- For Amazon S3 artifacts -->
- <repository>
- <id>e-xml.sourceforge.net</id>
- <
url>http://e-xml.sourceforge.net/maven2/repository</url>
- </repository>
- </repositories>
+ <!-- the FAQs -->
+ <execution>
+ <id>faq_en</id>
+ <phase>package</phase>
+ <goals>
+ <goal>resources</goal>
+ <goal>generate</goal>
+ </goals>
+ <configuration>
+
<sourceDocumentName>master.xml</sourceDocumentName>
+
<sourceDirectory>${basedir}/src/main/docbook/faq/en</sourceDirectory>
+ <imageResource>
+
<directory>${basedir}/src/main/docbook/images</directory>
+ </imageResource>
+ <cssResource>
+
<directory>${basedir}/src/main/docbook/css</directory>
+ </cssResource>
+
<targetDirectory>${basedir}/target/docbook/faq_en</targetDirectory>
+ <formats>
+ <format>
+ <formatName>pdf</formatName>
+
<stylesheetResource>classpath:/xslt/org/jboss/pdf.xsl</stylesheetResource>
+ <finalName>faq_en.pdf</finalName>
+ </format>
+ <format>
+ <formatName>html</formatName>
+
<stylesheetResource>classpath:/xslt/org/jboss/xhtml.xsl</stylesheetResource>
+ <finalName>index.html</finalName>
+ </format>
+ <format>
+ <formatName>html_single</formatName>
+
<stylesheetResource>classpath:/xslt/org/jboss/xhtml-single.xsl
+ </stylesheetResource>
+ <finalName>index.html</finalName>
+ </format>
+ </formats>
+ <options>
+ <xincludeSupported>false</xincludeSupported>
+ </options>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
- <profiles>
- <profile>
- <!-- This testMoreState generates Javadocs and the UserGuide, FAQs and
Tutorial in the "package" phase. -->
- <id>Docs</id>
- <activation>
- <activeByDefault>false</activeByDefault>
- </activation>
- <properties>
- <!-- override to generate javadocs in the "package" phase
-->
- <javadocPhase>package</javadocPhase>
- </properties>
- <build>
- <plugins>
- <!-- the docbook generation plugin for the user guide -->
- <plugin>
- <groupId>org.jboss.maven.plugins</groupId>
- <artifactId>maven-jdocbook-plugin</artifactId>
- <version>2.0.0</version>
- <extensions>true</extensions>
- <dependencies>
- <dependency>
- <groupId>org.jboss</groupId>
-
<artifactId>jbossorg-docbook-xslt</artifactId>
- <version>1.1.0</version>
- </dependency>
- <dependency>
- <groupId>org.jboss</groupId>
-
<artifactId>jbossorg-jdocbook-style</artifactId>
- <version>1.1.0</version>
- <type>jdocbook-style</type>
- </dependency>
- </dependencies>
- <executions>
+ <profile>
+ <id>test-hudson</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <properties>
+ <defaultTestGroup>functional,unit</defaultTestGroup>
+ <protocol.stack>tcp</protocol.stack>
+ </properties>
+ </profile>
- <!-- The User Guide-->
- <execution>
- <id>userguide_en</id>
- <phase>package</phase>
- <goals>
- <goal>resources</goal>
- <goal>generate</goal>
- </goals>
- <configuration>
-
<sourceDocumentName>master.xml</sourceDocumentName>
-
<sourceDirectory>${basedir}/src/main/docbook/userguide/en</sourceDirectory>
- <imageResource>
-
<directory>${basedir}/src/main/docbook/images</directory>
- </imageResource>
- <cssResource>
-
<directory>${basedir}/src/main/docbook/css</directory>
- </cssResource>
-
<targetDirectory>${basedir}/target/docbook/userguide_en</targetDirectory>
- <formats>
- <format>
- <formatName>pdf</formatName>
-
<stylesheetResource>classpath:/xslt/org/jboss/pdf.xsl</stylesheetResource>
-
<finalName>userguide_en.pdf</finalName>
- </format>
- <format>
- <formatName>html</formatName>
-
<stylesheetResource>classpath:/xslt/org/jboss/xhtml.xsl</stylesheetResource>
-
<finalName>index.html</finalName>
- </format>
- <format>
-
<formatName>html_single</formatName>
-
<stylesheetResource>classpath:/xslt/org/jboss/xhtml-single.xsl
- </stylesheetResource>
-
<finalName>index.html</finalName>
- </format>
- </formats>
- <options>
-
<xincludeSupported>false</xincludeSupported>
- </options>
- </configuration>
- </execution>
+ <profile>
+ <id>test-functional</id>
+ <properties>
+ <defaultTestGroup>functional</defaultTestGroup>
+ <protocol.stack>tcp</protocol.stack>
+ </properties>
+ </profile>
- <!-- The Tutorial -->
- <execution>
- <id>tutorial_en</id>
- <phase>package</phase>
- <goals>
- <goal>resources</goal>
- <goal>generate</goal>
- </goals>
- <configuration>
-
<sourceDocumentName>master.xml</sourceDocumentName>
-
<sourceDirectory>${basedir}/src/main/docbook/tutorial/en</sourceDirectory>
- <imageResource>
-
<directory>${basedir}/src/main/docbook/images</directory>
- </imageResource>
- <cssResource>
-
<directory>${basedir}/src/main/docbook/css</directory>
- </cssResource>
-
<targetDirectory>${basedir}/target/docbook/tutorial_en</targetDirectory>
- <formats>
- <format>
- <formatName>pdf</formatName>
-
<stylesheetResource>classpath:/xslt/org/jboss/pdf.xsl</stylesheetResource>
-
<finalName>tutorial_en.pdf</finalName>
- </format>
- <format>
- <formatName>html</formatName>
-
<stylesheetResource>classpath:/xslt/org/jboss/xhtml.xsl</stylesheetResource>
-
<finalName>index.html</finalName>
- </format>
- <format>
-
<formatName>html_single</formatName>
-
<stylesheetResource>classpath:/xslt/org/jboss/xhtml-single.xsl
- </stylesheetResource>
-
<finalName>index.html</finalName>
- </format>
- </formats>
- <options>
-
<xincludeSupported>false</xincludeSupported>
- </options>
- </configuration>
- </execution>
+ <profile>
+ <id>test-unit</id>
+ <properties>
+ <defaultTestGroup>unit</defaultTestGroup>
+ </properties>
+ </profile>
- <!-- the FAQs -->
- <execution>
- <id>faq_en</id>
- <phase>package</phase>
- <goals>
- <goal>resources</goal>
- <goal>generate</goal>
- </goals>
- <configuration>
-
<sourceDocumentName>master.xml</sourceDocumentName>
-
<sourceDirectory>${basedir}/src/main/docbook/faq/en</sourceDirectory>
- <imageResource>
-
<directory>${basedir}/src/main/docbook/images</directory>
- </imageResource>
- <cssResource>
-
<directory>${basedir}/src/main/docbook/css</directory>
- </cssResource>
-
<targetDirectory>${basedir}/target/docbook/faq_en</targetDirectory>
- <formats>
- <format>
- <formatName>pdf</formatName>
-
<stylesheetResource>classpath:/xslt/org/jboss/pdf.xsl</stylesheetResource>
-
<finalName>faq_en.pdf</finalName>
- </format>
- <format>
- <formatName>html</formatName>
-
<stylesheetResource>classpath:/xslt/org/jboss/xhtml.xsl</stylesheetResource>
-
<finalName>index.html</finalName>
- </format>
- <format>
-
<formatName>html_single</formatName>
-
<stylesheetResource>classpath:/xslt/org/jboss/xhtml-single.xsl
- </stylesheetResource>
-
<finalName>index.html</finalName>
- </format>
- </formats>
- <options>
-
<xincludeSupported>false</xincludeSupported>
- </options>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </profile>
+ <profile>
+ <id>test-jgroups</id>
+ <properties>
+ <defaultTestGroup>jgroups</defaultTestGroup>
+ </properties>
+ </profile>
- <profile>
- <id>test-hudson</id>
- <activation>
- <activeByDefault>true</activeByDefault>
- </activation>
- <properties>
- <defaultTestGroup>functional,unit</defaultTestGroup>
- <protocol.stack>tcp</protocol.stack>
- </properties>
- </profile>
+ <profile>
+ <id>test-transaction</id>
+ <properties>
+ <defaultTestGroup>transaction</defaultTestGroup>
+ </properties>
+ </profile>
- <profile>
- <id>test-functional</id>
- <properties>
- <defaultTestGroup>functional</defaultTestGroup>
- <protocol.stack>tcp</protocol.stack>
- </properties>
- </profile>
+ <profile>
+ <id>profiling</id>
+ <properties>
+ <defaultTestGroup>profiling</defaultTestGroup>
+ </properties>
+ </profile>
- <profile>
- <id>test-unit</id>
- <properties>
- <defaultTestGroup>unit</defaultTestGroup>
- </properties>
- </profile>
+ <profile>
+ <id>test-integration</id>
+ <properties>
+ <defaultTestGroup>integration</defaultTestGroup>
+ <protocol.stack>udp</protocol.stack>
+ </properties>
+ </profile>
- <profile>
- <id>test-jgroups</id>
- <properties>
- <defaultTestGroup>jgroups</defaultTestGroup>
- </properties>
- </profile>
- <profile>
- <id>test-transaction</id>
- <properties>
- <defaultTestGroup>transaction</defaultTestGroup>
- </properties>
- </profile>
-
- <profile>
- <id>profiling</id>
- <properties>
- <defaultTestGroup>profiling</defaultTestGroup>
- </properties>
- </profile>
-
- <profile>
- <id>test-integration</id>
- <properties>
- <defaultTestGroup>integration</defaultTestGroup>
- <protocol.stack>udp</protocol.stack>
- </properties>
- </profile>
-
-
- <profile>
- <id>JBossAS</id>
- <activation>
- <activeByDefault>false</activeByDefault>
- </activation>
- <properties>
-
<jbosscache-core-version>3.0.3-SNAPSHOT-JBossAS</jbosscache-core-version>
- <defaultTestGroup>functional,unit</defaultTestGroup>
- <protocol.stack>tcp</protocol.stack>
- </properties>
- <dependencies>
- <dependency>
- <groupId>jgroups</groupId>
- <artifactId>jgroups</artifactId>
- <version>2.6.7.GA</version>
- </dependency>
- <!-- Replaces javax.transaction/jta -->
- <dependency>
- <groupId>org.jboss.javaee</groupId>
- <artifactId>jboss-javaee</artifactId>
- <version>5.0.0.GA</version>
- </dependency>
- <dependency>
- <groupId>org.jboss</groupId>
- <artifactId>jboss-common-core</artifactId>
- <version>2.2.10.GA</version>
- </dependency>
- <dependency>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- <version>1.1.0.jboss</version>
- </dependency>
- <dependency>
- <groupId>jboss.jbossts</groupId>
- <artifactId>jbossjta</artifactId>
- <version>4.4.0.GA</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
- </profile>
- </profiles>
+ <profile>
+ <id>JBossAS</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <properties>
+
<jbosscache-core-version>3.0.3-SNAPSHOT-JBossAS</jbosscache-core-version>
+ <defaultTestGroup>functional,unit</defaultTestGroup>
+ <protocol.stack>tcp</protocol.stack>
+ </properties>
+ <dependencies>
+ <dependency>
+ <groupId>jgroups</groupId>
+ <artifactId>jgroups</artifactId>
+ <version>2.6.7.GA</version>
+ </dependency>
+ <!-- Replaces javax.transaction/jta -->
+ <dependency>
+ <groupId>org.jboss.javaee</groupId>
+ <artifactId>jboss-javaee</artifactId>
+ <version>5.0.0.GA</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss</groupId>
+ <artifactId>jboss-common-core</artifactId>
+ <version>2.2.10.GA</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ <version>1.1.0.jboss</version>
+ </dependency>
+ <dependency>
+ <groupId>jboss.jbossts</groupId>
+ <artifactId>jbossjta</artifactId>
+ <version>4.4.0.GA</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
</project>
Modified: core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/RPCManagerImpl.java 2009-02-13
13:43:13 UTC (rev 7686)
+++ core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/RPCManagerImpl.java 2009-02-13
14:43:12 UTC (rev 7687)
@@ -21,29 +21,12 @@
*/
package org.jboss.cache;
-import java.net.URL;
-import java.text.NumberFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.Vector;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
-
-import javax.transaction.TransactionManager;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.commands.ReplicableCommand;
import org.jboss.cache.config.Configuration;
-import org.jboss.cache.config.RuntimeConfig;
import org.jboss.cache.config.Configuration.NodeLockingScheme;
+import org.jboss.cache.config.RuntimeConfig;
import org.jboss.cache.factories.ComponentRegistry;
import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.factories.annotations.Start;
@@ -82,14 +65,26 @@
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
+import javax.transaction.TransactionManager;
+import java.net.URL;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.Vector;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
/**
* Manager that handles all RPC calls between JBoss Cache instances
*
* @author <a href="mailto:manik AT jboss DOT org">Manik Surtani (manik
AT jboss DOT org)</a>
*/
@MBean(objectName = "RPCManager", description = "Manages RPC connections
to remote caches")
-public class RPCManagerImpl implements RPCManager
-{
+public class RPCManagerImpl implements RPCManager {
private Channel channel;
private final Log log = LogFactory.getLog(RPCManagerImpl.class);
private volatile List<Address> members;
@@ -138,8 +133,7 @@
public void setupDependencies(ChannelMessageListener messageListener, Configuration
configuration, Notifier notifier,
CacheSPI spi, Marshaller marshaller, TransactionTable
txTable,
TransactionManager txManager, InvocationContextContainer
container, InterceptorChain interceptorChain,
- ComponentRegistry componentRegistry, LockManager
lockManager)
- {
+ ComponentRegistry componentRegistry, LockManager
lockManager) {
this.messageListener = messageListener;
this.configuration = configuration;
this.notifier = notifier;
@@ -153,123 +147,49 @@
this.lockManager = lockManager;
}
- public class FlushTracker
- {
+ public class FlushTracker {
private final ReclosableLatch flushBlockGate = new ReclosableLatch();
private final AtomicInteger flushCompletionCount = new AtomicInteger();
- private final ReentrantReadWriteLock coordinationLock = new
ReentrantReadWriteLock();
private final ReclosableLatch flushWaitGate = new ReclosableLatch(false);
- public void block()
- {
+ public void block() {
flushBlockGate.close();
flushWaitGate.open();
}
- public void unblock()
- {
+ public void unblock() {
flushWaitGate.close();
flushCompletionCount.incrementAndGet();
flushBlockGate.open();
}
- public int getFlushCompletionCount()
- {
+ public int getFlushCompletionCount() {
return flushCompletionCount.get();
}
- public void lockProcessingLock()
- {
- if (! configuration.isNonBlockingStateTransfer())
- return;
-
- for (;;)
- {
- try
- {
- if
(!coordinationLock.readLock().tryLock(configuration.getStateRetrievalTimeout(),
TimeUnit.MILLISECONDS))
- throw new TimeoutException("Could not obtain processing
lock");
-
- return;
- }
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- public void unlockProcessingLock()
- {
- if (! configuration.isNonBlockingStateTransfer())
- return;
-
- coordinationLock.readLock().unlock();
- }
-
- public void lockSuspendProcessingLock()
- {
- if (! configuration.isNonBlockingStateTransfer())
- return;
-
- for (;;)
- {
- try
- {
- if
(!coordinationLock.writeLock().tryLock(configuration.getStateRetrievalTimeout(),
TimeUnit.MILLISECONDS))
- throw new TimeoutException("Could not obtain processing
lock");
-
- return;
- }
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- public void unlockSuspendProcessingLock()
- {
- if (! configuration.isNonBlockingStateTransfer())
- return;
-
- if (coordinationLock.isWriteLockedByCurrentThread())
- coordinationLock.writeLock().unlock();
- }
-
- public void waitForFlushCompletion(long timeout)
- {
- for (; ;)
- {
- try
- {
- if (channel.flushSupported() && !flushBlockGate.await(timeout,
TimeUnit.MILLISECONDS))
- {
+ public void waitForFlushCompletion(long timeout) {
+ for (; ;) {
+ try {
+ if (channel.flushSupported() && !flushBlockGate.await(timeout,
TimeUnit.MILLISECONDS)) {
throw new TimeoutException("State retrieval timed out waiting for
flush to block. (timeout = " + timeout + " millis) ");
}
return;
}
- catch (InterruptedException e)
- {
+ catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
- public void waitForFlushStart(long timeout)
- {
- for (; ;)
- {
- try
- {
- if (channel.flushSupported() && !flushWaitGate.await(timeout,
TimeUnit.MILLISECONDS))
- {
+ public void waitForFlushStart(long timeout) {
+ for (; ;) {
+ try {
+ if (channel.flushSupported() && !flushWaitGate.await(timeout,
TimeUnit.MILLISECONDS)) {
throw new TimeoutException("State retrieval timed out waiting for
flush to block. (timeout = " + timeout + " millis) ");
}
return;
}
- catch (InterruptedException e)
- {
+ catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@@ -279,11 +199,9 @@
// ------------ START: Lifecycle methods ------------
@Start(priority = 15)
- public void start()
- {
+ public void start() {
- switch (configuration.getCacheMode())
- {
+ switch (configuration.getCacheMode()) {
case LOCAL:
log.debug("cache mode is local, will not create the channel");
isInLocalMode = true;
@@ -295,157 +213,60 @@
case INVALIDATION_SYNC:
isInLocalMode = false;
isUsingBuddyReplication = configuration.getBuddyReplicationConfig() != null
&& configuration.getBuddyReplicationConfig().isEnabled();
- if (log.isDebugEnabled())
- {
+ if (log.isDebugEnabled()) {
log.debug("Cache mode is " + configuration.getCacheMode());
}
boolean fetchState = shouldFetchStateOnStartup();
- boolean nonBlocking = configuration.isNonBlockingStateTransfer();
- initialiseChannelAndRpcDispatcher(fetchState && !nonBlocking);
+ initialiseChannelAndRpcDispatcher(fetchState);
- if (!fetchState || nonBlocking)
- {
- try
- {
- // Allow commands to be ACKed during state transfer
- if (nonBlocking)
- {
- componentRegistry.setBlockInStarting(false);
- }
+ if (!fetchState) {
+ try {
channel.connect(configuration.getClusterName());
- if (log.isInfoEnabled())
- {
+ if (log.isInfoEnabled()) {
log.info("Cache local address is " + getLocalAddress());
}
}
- catch (ChannelException e)
- {
+ catch (ChannelException e) {
throw new CacheException("Unable to connect to JGroups
channel", e);
}
- if (!fetchState)
- {
+ if (!fetchState) {
return;
}
}
long start = System.currentTimeMillis();
- if (nonBlocking)
- {
- startNonBlockStateTransfer(getMembers());
- }
- else
- {
- try
- {
- channel.connect(configuration.getClusterName(), null, null,
configuration.getStateRetrievalTimeout());
- if (log.isInfoEnabled())
- {
- log.info("Cache local address is " + getLocalAddress());
- }
-
- if (getMembers().size() > 1)
- {
- messageListener.waitForState();
- }
+ try {
+ channel.connect(configuration.getClusterName(), null, null,
configuration.getStateRetrievalTimeout());
+ if (log.isInfoEnabled()) {
+ log.info("Cache local address is " + getLocalAddress());
}
- catch (ChannelException e)
- {
- throw new CacheException("Unable to connect to JGroups
channel", e);
- }
- catch (Exception ex)
- {
- // make sure we disconnect from the channel before we throw this
exception!
- // JBCACHE-761
- disconnect();
- throw new CacheException("Unable to fetch state on startup",
ex);
- }
- }
- if (log.isInfoEnabled())
- {
- log.info("state was retrieved successfully (in " +
(System.currentTimeMillis() - start) + " milliseconds)");
- }
- }
-
- }
-
- private void startNonBlockStateTransfer(List<Address> members)
- {
-
- if (members.size() < 2)
- {
- return;
- }
-
- boolean success = false;
-
- outer:
- for (int i = 0, wait = 1000; i < 5; i++)
- {
- for (Address member : members)
- {
- if (member.equals(getLocalAddress()))
- {
- continue;
- }
-
- try
- {
- if (log.isInfoEnabled())
- {
- log.info("Trying to fetch state from: " + member);
- }
- if (getState(null, member))
- {
+ if (getMembers().size() > 1) {
messageListener.waitForState();
- success = true;
- break outer;
}
}
- catch (Exception e)
- {
- if (log.isDebugEnabled())
- {
- log.debug("Error while fetching state", e);
- }
+ catch (ChannelException e) {
+ throw new CacheException("Unable to connect to JGroups channel",
e);
}
- }
-
- if (!success)
- {
- if (log.isWarnEnabled())
- {
- log.warn("Could not find available peer for state, backing off and
retrying");
+ catch (Exception ex) {
+ // make sure we disconnect from the channel before we throw this
exception!
+ // JBCACHE-761
+ disconnect();
+ throw new CacheException("Unable to fetch state on startup",
ex);
}
- try
- {
- Thread.sleep(wait <<= 2);
+ if (log.isInfoEnabled()) {
+ log.info("state was retrieved successfully (in " +
(System.currentTimeMillis() - start) + " milliseconds)");
}
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
- }
- }
-
}
- if (!success)
- {
- disconnect();
- throw new CacheException("Unable to fetch state on startup");
- }
-
- componentRegistry.setBlockInStarting(true);
}
- public void disconnect()
- {
- if (channel != null && channel.isOpen())
- {
+ public void disconnect() {
+ if (channel != null && channel.isOpen()) {
log.info("Disconnecting and closing the Channel");
channel.disconnect();
channel.close();
@@ -453,21 +274,17 @@
}
@Stop(priority = 8)
- public void stop()
- {
- try
- {
+ public void stop() {
+ try {
disconnect();
}
- catch (Exception toLog)
- {
+ catch (Exception toLog) {
log.error("Problem closing channel; setting it to null", toLog);
}
channel = null;
configuration.getRuntimeConfig().setChannel(null);
- if (rpcDispatcher != null)
- {
+ if (rpcDispatcher != null) {
log.info("Stopping the RpcDispatcher");
rpcDispatcher.stop();
}
@@ -482,56 +299,41 @@
/**
* @return true if we need to fetch state on startup. I.e., initiate a state
transfer.
*/
- private boolean shouldFetchStateOnStartup()
- {
+ private boolean shouldFetchStateOnStartup() {
boolean loaderFetch = configuration.getCacheLoaderConfig() != null &&
configuration.getCacheLoaderConfig().isFetchPersistentState();
return !configuration.isInactiveOnStartup() && !isUsingBuddyReplication
&& (configuration.isFetchInMemoryState() || loaderFetch);
}
@SuppressWarnings("deprecation")
- private void initialiseChannelAndRpcDispatcher(boolean fetchState) throws
CacheException
- {
+ private void initialiseChannelAndRpcDispatcher(boolean fetchState) throws
CacheException {
channel = configuration.getRuntimeConfig().getChannel();
- if (channel == null)
- {
+ if (channel == null) {
// Try to create a multiplexer channel
channel = getMultiplexerChannel();
- if (channel != null)
- {
+ if (channel != null) {
ReflectionUtil.setValue(configuration, "accessible", true);
configuration.setUsingMultiplexer(true);
- if (log.isDebugEnabled())
- {
+ if (log.isDebugEnabled()) {
log.debug("Created Multiplexer Channel for cache cluster " +
configuration.getClusterName() + " using stack " +
configuration.getMultiplexerStack());
}
- }
- else
- {
- try
- {
- if (configuration.getJGroupsConfigFile() != null)
- {
+ } else {
+ try {
+ if (configuration.getJGroupsConfigFile() != null) {
URL u = configuration.getJGroupsConfigFile();
if (log.isTraceEnabled()) log.trace("Grabbing cluster properties
from " + u);
channel = new JChannel(u);
- }
- else if (configuration.getClusterConfig() == null)
- {
+ } else if (configuration.getClusterConfig() == null) {
log.debug("setting cluster properties to default value");
channel = new JChannel(configuration.getDefaultClusterConfig());
- }
- else
- {
- if (trace)
- {
+ } else {
+ if (trace) {
log.trace("Cache cluster properties: " +
configuration.getClusterConfig());
}
channel = new JChannel(configuration.getClusterConfig());
}
}
- catch (ChannelException e)
- {
+ catch (ChannelException e) {
throw new CacheException(e);
}
}
@@ -546,43 +348,35 @@
channel.setOpt(Channel.AUTO_GETSTATE, fetchState);
channel.setOpt(Channel.BLOCK, true);
- if (configuration.isUseRegionBasedMarshalling())
- {
+ if (configuration.isUseRegionBasedMarshalling()) {
rpcDispatcher = new InactiveRegionAwareRpcDispatcher(channel, messageListener,
new MembershipListenerAdaptor(),
- spi, invocationContextContainer, interceptorChain, componentRegistry,
this);
- }
- else
- {
+ spi,
invocationContextContainer, interceptorChain, componentRegistry);
+ } else {
rpcDispatcher = new CommandAwareRpcDispatcher(channel, messageListener, new
MembershipListenerAdaptor(),
- invocationContextContainer, invocationContextContainer, interceptorChain,
componentRegistry, this);
+ invocationContextContainer,
invocationContextContainer, interceptorChain, componentRegistry);
}
checkAppropriateConfig();
rpcDispatcher.setRequestMarshaller(marshaller);
rpcDispatcher.setResponseMarshaller(marshaller);
}
- public Channel getChannel()
- {
+ public Channel getChannel() {
return channel;
}
- private JChannel getMultiplexerChannel() throws CacheException
- {
+ private JChannel getMultiplexerChannel() throws CacheException {
String stackName = configuration.getMultiplexerStack();
RuntimeConfig rtc = configuration.getRuntimeConfig();
ChannelFactory channelFactory = rtc.getMuxChannelFactory();
JChannel muxchannel = null;
- if (channelFactory != null)
- {
- try
- {
+ if (channelFactory != null) {
+ try {
muxchannel = (JChannel) channelFactory.createMultiplexerChannel(stackName,
configuration.getClusterName());
}
- catch (Exception e)
- {
+ catch (Exception e) {
throw new CacheException("Failed to create multiplexed channel using
stack " + stackName, e);
}
}
@@ -592,21 +386,18 @@
@Deprecated
- private void removeLocksForDeadMembers(NodeSPI node, List deadMembers)
- {
+ private void removeLocksForDeadMembers(NodeSPI node, List deadMembers) {
Set<GlobalTransaction> deadOwners = new HashSet<GlobalTransaction>();
Object owner = lockManager.getWriteOwner(node);
if (isLockOwnerDead(owner, deadMembers)) deadOwners.add((GlobalTransaction)
owner);
- for (Object readOwner : lockManager.getReadOwners(node))
- {
+ for (Object readOwner : lockManager.getReadOwners(node)) {
if (isLockOwnerDead(readOwner, deadMembers)) deadOwners.add((GlobalTransaction)
readOwner);
}
- for (GlobalTransaction deadOwner : deadOwners)
- {
+ for (GlobalTransaction deadOwner : deadOwners) {
boolean localTx = deadOwner.getAddress().equals(getLocalAddress());
boolean broken = LockUtil.breakTransactionLock(node.getFqn(), lockManager,
deadOwner, localTx, txTable, txManager);
@@ -614,8 +405,7 @@
}
// Recursively unlock children
- for (Object child : node.getChildrenDirect())
- {
+ for (Object child : node.getChildrenDirect()) {
removeLocksForDeadMembers((NodeSPI) child, deadMembers);
}
}
@@ -624,8 +414,7 @@
/**
* Only used with MVCC.
*/
- private void removeLocksForDeadMembers(InternalNode<?, ?> node, List
deadMembers)
- {
+ private void removeLocksForDeadMembers(InternalNode<?, ?> node, List
deadMembers) {
Set<GlobalTransaction> deadOwners = new HashSet<GlobalTransaction>();
Object owner = lockManager.getWriteOwner(node.getFqn());
@@ -633,8 +422,7 @@
// MVCC won't have any read locks.
- for (GlobalTransaction deadOwner : deadOwners)
- {
+ for (GlobalTransaction deadOwner : deadOwners) {
boolean localTx = deadOwner.getAddress().equals(getLocalAddress());
boolean broken = LockUtil.breakTransactionLock(node.getFqn(), lockManager,
deadOwner, localTx, txTable, txManager);
@@ -645,11 +433,9 @@
for (InternalNode child : node.getChildren()) removeLocksForDeadMembers(child,
deadMembers);
}
- private boolean isLockOwnerDead(Object owner, List deadMembers)
- {
+ private boolean isLockOwnerDead(Object owner, List deadMembers) {
boolean result = false;
- if (owner != null && owner instanceof GlobalTransaction)
- {
+ if (owner != null && owner instanceof GlobalTransaction) {
Object addr = ((GlobalTransaction) owner).getAddress();
result = deadMembers.contains(addr);
}
@@ -660,70 +446,52 @@
// ------------ START: RPC call methods ------------
- public List<Object> callRemoteMethods(Vector<Address> recipients,
ReplicableCommand command, int mode, long timeout, boolean useOutOfBandMessage) throws
Exception
- {
+ public List<Object> callRemoteMethods(Vector<Address> recipients,
ReplicableCommand command, int mode, long timeout, boolean useOutOfBandMessage) throws
Exception {
return callRemoteMethods(recipients, command, mode, timeout, null,
useOutOfBandMessage);
}
- public List<Object> callRemoteMethods(Vector<Address> recipients,
ReplicableCommand command, boolean synchronous, long timeout, boolean useOutOfBandMessage)
throws Exception
- {
+ public List<Object> callRemoteMethods(Vector<Address> recipients,
ReplicableCommand command, boolean synchronous, long timeout, boolean useOutOfBandMessage)
throws Exception {
return callRemoteMethods(recipients, command, synchronous ? GroupRequest.GET_ALL :
GroupRequest.GET_NONE, timeout, useOutOfBandMessage);
}
- public List<Object> callRemoteMethods(Vector<Address> recipients,
ReplicableCommand command, int mode, long timeout, RspFilter responseFilter, boolean
useOutOfBandMessage) throws Exception
- {
+ public List<Object> callRemoteMethods(Vector<Address> recipients,
ReplicableCommand command, int mode, long timeout, RspFilter responseFilter, boolean
useOutOfBandMessage) throws Exception {
boolean success = true;
- boolean unlock = false;
- try
- {
+ try {
// short circuit if we don't have an RpcDispatcher!
if (rpcDispatcher == null) return null;
int modeToUse = mode;
int preferredMode;
- if ((preferredMode =
spi.getInvocationContext().getOptionOverrides().getGroupRequestMode()) > -1)
- {
+ if ((preferredMode =
spi.getInvocationContext().getOptionOverrides().getGroupRequestMode()) > -1) {
modeToUse = preferredMode;
}
- if (trace)
- {
+ if (trace) {
log.trace("callRemoteMethods(): valid members are " + recipients +
" methods: " + command + " Using OOB? " + useOutOfBandMessage + "
modeToUse: " + modeToUse);
}
- flushTracker.lockProcessingLock();
- unlock = true;
flushTracker.waitForFlushCompletion(configuration.getStateRetrievalTimeout());
useOutOfBandMessage = false;
RspList rsps = rpcDispatcher.invokeRemoteCommands(recipients, command,
modeToUse, timeout, isUsingBuddyReplication, useOutOfBandMessage, responseFilter);
if (mode == GroupRequest.GET_NONE) return Collections.emptyList();// async case
- if (trace)
- {
+ if (trace) {
log.trace("(" + getLocalAddress() + "): responses for method
" + command.getClass().getSimpleName() + ":\n" + rsps);
}
// short-circuit no-return-value calls.
if (rsps == null) return Collections.emptyList();
List<Object> retval = new ArrayList<Object>(rsps.size());
- for (Rsp rsp : rsps.values())
- {
- if (rsp.wasSuspected() || !rsp.wasReceived())
- {
+ for (Rsp rsp : rsps.values()) {
+ if (rsp.wasSuspected() || !rsp.wasReceived()) {
CacheException ex;
- if (rsp.wasSuspected())
- {
+ if (rsp.wasSuspected()) {
ex = new SuspectException("Suspected member: " +
rsp.getSender());
- }
- else
- {
+ } else {
ex = new TimeoutException("Replication timeout for " +
rsp.getSender());
}
retval.add(new ReplicationException("rsp=" + rsp, ex));
success = false;
- }
- else
- {
+ } else {
Object value = rsp.getValue();
- if (value instanceof Exception && !(value instanceof
ReplicationException))
- {
+ if (value instanceof Exception && !(value instanceof
ReplicationException)) {
// if we have any application-level exceptions make sure we throw
them!!
if (trace) log.trace("Recieved exception'" + value +
"' from " + rsp.getSender());
throw (Exception) value;
@@ -734,45 +502,35 @@
}
return retval;
}
- catch (Exception e)
- {
+ catch (Exception e) {
success = false;
throw e;
}
- finally
- {
+ finally {
computeStats(success);
- if (unlock)
- flushTracker.unlockProcessingLock();
}
}
// ------------ START: Partial state transfer methods ------------
- public void fetchPartialState(List<Address> sources, Fqn sourceTarget, Fqn
integrationTarget) throws Exception
- {
+ public void fetchPartialState(List<Address> sources, Fqn sourceTarget, Fqn
integrationTarget) throws Exception {
String encodedStateId = sourceTarget +
DefaultStateTransferManager.PARTIAL_STATE_DELIMITER + integrationTarget;
fetchPartialState(sources, encodedStateId);
}
- public void fetchPartialState(List<Address> sources, Fqn subtree) throws
Exception
- {
- if (subtree == null)
- {
+ public void fetchPartialState(List<Address> sources, Fqn subtree) throws
Exception {
+ if (subtree == null) {
throw new IllegalArgumentException("Cannot fetch partial state. Null
subtree.");
}
fetchPartialState(sources, subtree.toString());
}
- private void fetchPartialState(List<Address> sources, String stateId) throws
Exception
- {
- if (sources == null || sources.isEmpty() || stateId == null)
- {
+ private void fetchPartialState(List<Address> sources, String stateId) throws
Exception {
+ if (sources == null || sources.isEmpty() || stateId == null) {
// should this really be throwing an exception? Are there valid use cases where
partial state may not be available? - Manik
// Yes -- cache is configured LOCAL but app doesn't know it -- Brian
//throw new IllegalArgumentException("Cannot fetch partial state, targets
are " + sources + " and stateId is " + stateId);
- if (log.isWarnEnabled())
- {
+ if (log.isWarnEnabled()) {
log.warn("Cannot fetch partial state, targets are " + sources +
" and stateId is " + stateId);
}
return;
@@ -783,68 +541,55 @@
//skip *this* node as a target
targets.remove(getLocalAddress());
- if (targets.isEmpty())
- {
+ if (targets.isEmpty()) {
// Definitely no exception here -- this happens every time the 1st node in the
// cluster activates a region!! -- Brian
if (log.isDebugEnabled()) log.debug("Cannot fetch partial state. There are
no target members specified");
return;
}
- if (log.isDebugEnabled())
- {
+ if (log.isDebugEnabled()) {
log.debug("Node " + getLocalAddress() + " fetching partial state
" + stateId + " from members " + targets);
}
boolean successfulTransfer = false;
- for (Address target : targets)
- {
- try
- {
- if (log.isDebugEnabled())
- {
+ for (Address target : targets) {
+ try {
+ if (log.isDebugEnabled()) {
log.debug("Node " + getLocalAddress() + " fetching partial
state " + stateId + " from member " + target);
}
messageListener.setStateSet(false);
successfulTransfer = getState(stateId, target);
- if (successfulTransfer)
- {
- try
- {
+ if (successfulTransfer) {
+ try {
messageListener.waitForState();
}
- catch (Exception transferFailed)
- {
+ catch (Exception transferFailed) {
if (log.isTraceEnabled()) log.trace("Error while fetching
state", transferFailed);
successfulTransfer = false;
}
}
- if (log.isDebugEnabled())
- {
+ if (log.isDebugEnabled()) {
log.debug("Node " + getLocalAddress() + " fetching partial
state " + stateId + " from member " + target + (successfulTransfer ? "
successful" : " failed"));
}
if (successfulTransfer) break;
}
- catch (IllegalStateException ise)
- {
+ catch (IllegalStateException ise) {
// thrown by the JGroups channel if state retrieval fails.
- if (log.isInfoEnabled())
- {
+ if (log.isInfoEnabled()) {
log.info("Channel problems fetching state. Continuing on to next
provider. ", ise);
}
}
}
- if (!successfulTransfer && log.isDebugEnabled())
- {
+ if (!successfulTransfer && log.isDebugEnabled()) {
log.debug("Node " + getLocalAddress() + " could not fetch partial
state " + stateId + " from any member " + targets);
}
}
- private boolean getState(String stateId, Address target) throws
ChannelNotConnectedException, ChannelClosedException
- {
+ private boolean getState(String stateId, Address target) throws
ChannelNotConnectedException, ChannelClosedException {
lastStateTransferSource = target;
- return ((JChannel) channel).getState(target, stateId,
configuration.getStateRetrievalTimeout(), !configuration.isNonBlockingStateTransfer());
+ return ((JChannel) channel).getState(target, stateId,
configuration.getStateRetrievalTimeout(), true);
}
@@ -853,65 +598,50 @@
// ------------ START: Informational methods ------------
@ManagedAttribute(description = "Local address")
- public String getLocalAddressString()
- {
+ public String getLocalAddressString() {
Address address = getLocalAddress();
return address == null ? "null" : address.toString();
}
- public Address getLastStateTransferSource()
- {
+ public Address getLastStateTransferSource() {
return lastStateTransferSource;
}
- public Address getLocalAddress()
- {
+ public Address getLocalAddress() {
return channel != null ? channel.getLocalAddress() : null;
}
@ManagedAttribute(description = "Cluster view")
- public String getMembersString()
- {
+ public String getMembersString() {
List l = getMembers();
return l == null ? "null" : l.toString();
}
- public List<Address> getMembers()
- {
+ public List<Address> getMembers() {
if (isInLocalMode) return null;
- if (members == null)
- {
+ if (members == null) {
return Collections.emptyList();
- }
- else
- {
+ } else {
return members;
}
}
- public boolean isCoordinator()
- {
+ public boolean isCoordinator() {
return coordinator;
}
- public Address getCoordinator()
- {
- if (channel == null)
- {
+ public Address getCoordinator() {
+ if (channel == null) {
return null;
}
- synchronized (coordinatorLock)
- {
- while (members == null || members.isEmpty())
- {
+ synchronized (coordinatorLock) {
+ while (members == null || members.isEmpty()) {
log.debug("getCoordinator(): waiting on viewAccepted()");
- try
- {
+ try {
coordinatorLock.wait();
}
- catch (InterruptedException e)
- {
+ catch (InterruptedException e) {
log.error("getCoordinator(): Interrupted while waiting for members to
be set", e);
break;
}
@@ -924,22 +654,16 @@
/*----------------------- MembershipListener ------------------------*/
- protected class MembershipListenerAdaptor implements ExtendedMembershipListener
- {
+ protected class MembershipListenerAdaptor implements ExtendedMembershipListener {
- public void viewAccepted(View newView)
- {
- try
- {
+ public void viewAccepted(View newView) {
+ try {
Vector<Address> newMembers = newView.getMembers();
if (log.isInfoEnabled()) log.info("Received new cluster view: " +
newView);
- synchronized (coordinatorLock)
- {
+ synchronized (coordinatorLock) {
boolean needNotification = false;
- if (newMembers != null)
- {
- if (members != null)
- {
+ if (newMembers != null) {
+ if (members != null) {
// we had a membership list before this event. Check to make sure
we haven't lost any members,
// and if so, determine what members have been removed
// and roll back any tx and break any locks
@@ -947,16 +671,12 @@
removed.removeAll(newMembers);
spi.getInvocationContext().getOptionOverrides().setSkipCacheStatusCheck(true);
NodeSPI root = spi.getRoot();
- if (root != null)
- {
+ if (root != null) {
// UGH!!! What a shameless hack!
- if (configuration.getNodeLockingScheme() ==
NodeLockingScheme.MVCC)
- {
+ if (configuration.getNodeLockingScheme() ==
NodeLockingScheme.MVCC) {
removeLocksForDeadMembers(root.getDelegationTarget(),
removed);
- }
- else
- {
+ } else {
removeLocksForDeadMembers(root, removed);
}
}
@@ -971,8 +691,7 @@
coordinator = (members != null && members.size() != 0 &&
members.get(0).equals(getLocalAddress()));
// now notify listeners - *after* updating the coordinator. - JBCACHE-662
- if (needNotification && notifier != null)
- {
+ if (needNotification && notifier != null) {
InvocationContext ctx = spi.getInvocationContext();
notifier.notifyViewChange(newView, ctx);
}
@@ -981,8 +700,7 @@
coordinatorLock.notifyAll();
}
}
- catch (Throwable e)
- {
+ catch (Throwable e) {
//do not rethrow! jgroups might behave funny, resulting even in deadlock
log.error("Error found while processing view accepted!!!", e);
}
@@ -991,17 +709,14 @@
/**
* Called when a member is suspected.
*/
- public void suspect(Address suspected_mbr)
- {
+ public void suspect(Address suspected_mbr) {
}
/**
* Indicates that a channel has received a BLOCK event from FLUSH protocol.
*/
- public void block()
- {
- try
- {
+ public void block() {
+ try {
if (log.isDebugEnabled()) log.debug("Block received at " +
getLocalAddress());
flushTracker.block();
@@ -1010,8 +725,7 @@
if (log.isDebugEnabled()) log.debug("Block processed at " +
getLocalAddress());
}
- catch (Throwable e)
- {
+ catch (Throwable e) {
//do not rethrow! jgroups might behave funny, resulting even in deadlock
log.error("Error found while processing block()", e);
}
@@ -1020,10 +734,8 @@
/**
* Indicates that a channel has received a UNBLOCK event from FLUSH protocol.
*/
- public void unblock()
- {
- try
- {
+ public void unblock() {
+ try {
if (log.isDebugEnabled()) log.debug("UnBlock received at " +
getLocalAddress());
notifier.notifyCacheUnblocked(true);
@@ -1032,8 +744,7 @@
if (log.isDebugEnabled()) log.debug("UnBlock processed at " +
getLocalAddress());
}
- catch (Throwable e)
- {
+ catch (Throwable e) {
//do not rethrow! jgroups might behave funny, resulting even in deadlock
log.error("Error found while processing unblock", e);
}
@@ -1042,63 +753,50 @@
}
//jmx operations
- private void computeStats(boolean success)
- {
- if (statisticsEnabled && rpcDispatcher != null)
- {
- if (success)
- {
+ private void computeStats(boolean success) {
+ if (statisticsEnabled && rpcDispatcher != null) {
+ if (success) {
replicationCount++;
- }
- else
- {
+ } else {
replicationFailures++;
}
}
}
@ManagedOperation
- public void resetStatistics()
- {
+ public void resetStatistics() {
this.replicationCount = 0;
this.replicationFailures = 0;
}
@ManagedAttribute(description = "number of successful replications")
- public long getReplicationCount()
- {
+ public long getReplicationCount() {
return replicationCount;
}
@ManagedAttribute(description = "number of failed replications")
- public long getReplicationFailures()
- {
+ public long getReplicationFailures() {
return replicationFailures;
}
@ManagedAttribute(description = "whether or not jmx statistics are
enabled")
- public boolean isStatisticsEnabled()
- {
+ public boolean isStatisticsEnabled() {
return statisticsEnabled;
}
@ManagedAttribute(description = "whether or not the RPCManager is used in this
cache instance")
- public boolean isEnabled()
- {
+ public boolean isEnabled() {
return !isInLocalMode;
}
@ManagedAttribute
- public void setStatisticsEnabled(boolean statisticsEnabled)
- {
+ public void setStatisticsEnabled(boolean statisticsEnabled) {
this.statisticsEnabled = statisticsEnabled;
}
@ManagedAttribute(description = "RPC call success ratio")
- public String getSuccessRatio()
- {
- if (replicationCount == 0 || !statisticsEnabled)
- {
+ public String getSuccessRatio() {
+ if (replicationCount == 0 || !statisticsEnabled) {
return "N/A";
}
double totalCount = replicationCount + replicationFailures;
@@ -1109,40 +807,33 @@
/**
* Checks to see whether the cache is using an appropriate JGroups config.
*/
- private void checkAppropriateConfig()
- {
+ private void checkAppropriateConfig() {
//if we use a shared transport do not log any warn message
- if (configuration.getMultiplexerStack() != null)
- {
+ if (configuration.getMultiplexerStack() != null) {
return;
}
//bundling is not good for sync caches
Configuration.CacheMode cacheMode = configuration.getCacheMode();
- if (!cacheMode.equals(Configuration.CacheMode.LOCAL) &&
configuration.getCacheMode().isSynchronous())
- {
+ if (!cacheMode.equals(Configuration.CacheMode.LOCAL) &&
configuration.getCacheMode().isSynchronous()) {
ProtocolStack stack = ((JChannel) channel).getProtocolStack();
TP transport = stack.getTransport();
- if (transport.isEnableBundling())
- {
+ if (transport.isEnableBundling()) {
log.warn("You have enabled jgroups's message bundling, which is not
recommended for sync replication. If there is no particular " +
"reason for this we strongly recommend to disable message bundling
in JGroups config (enable_bundling=\"false\").");
}
}
//bundling is good for async caches
- if (!cacheMode.isSynchronous())
- {
+ if (!cacheMode.isSynchronous()) {
ProtocolStack stack = ((JChannel) channel).getProtocolStack();
TP transport = stack.getTransport();
- if (!transport.isEnableBundling())
- {
+ if (!transport.isEnableBundling()) {
log.warn("You have disabled jgroups's message bundling, which is not
recommended for async replication. If there is no particular " +
"reason for this we strongly recommend to enable message bundling
in JGroups config (enable_bundling=\"true\").");
}
}
}
- public FlushTracker getFlushTracker()
- {
+ public FlushTracker getFlushTracker() {
return flushTracker;
}
}
Modified: core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/Version.java
===================================================================
--- core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/Version.java 2009-02-13 13:43:13 UTC
(rev 7686)
+++ core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/Version.java 2009-02-13 14:43:12 UTC
(rev 7687)
@@ -30,12 +30,11 @@
* @version $Id$
*/
@Immutable
-public class Version
-{
- public static final String version = "3.1.0-SNAPSHOT";
- public static final String codename = "Cascabel";
+public class Version {
+ public static final String version = "3.0.3.CR1";
+ public static final String codename = "Naga";
//public static final String cvs = "$Id$";
- static final byte[] version_id = {'0', '3', '1', '0',
'S'};
+ static final byte[] version_id = {'0', '3', '0', '3',
'C', 'R', '1'};
private static final int MAJOR_SHIFT = 11;
private static final int MINOR_SHIFT = 6;
@@ -49,8 +48,7 @@
/**
* Prints version information.
*/
- public static void main(String[] args)
- {
+ public static void main(String[] args) {
System.out.println("\nVersion: \t" + version);
System.out.println("Codename: \t" + codename);
//System.out.println("CVS: \t" + cvs);
@@ -60,83 +58,64 @@
/**
* Returns version information as a string.
*/
- public static String printVersion()
- {
+ public static String printVersion() {
return "JBossCache '" + codename + "' " + version;// +
"[ " + cvs + "]";
}
- public static String printVersionId(byte[] v, int len)
- {
+ public static String printVersionId(byte[] v, int len) {
StringBuilder sb = new StringBuilder();
- if (v != null)
- {
- if (len <= 0)
- {
+ if (v != null) {
+ if (len <= 0) {
len = v.length;
}
- for (int i = 0; i < len; i++)
- {
+ for (int i = 0; i < len; i++) {
sb.append((char) v[i]);
}
}
return sb.toString();
}
- public static String printVersionId(byte[] v)
- {
+ public static String printVersionId(byte[] v) {
StringBuilder sb = new StringBuilder();
- if (v != null)
- {
+ if (v != null) {
for (byte aV : v) sb.append((char) aV);
}
return sb.toString();
}
- public static boolean compareTo(byte[] v)
- {
- if (v == null)
- {
+ public static boolean compareTo(byte[] v) {
+ if (v == null) {
return false;
}
- if (v.length < version_id.length)
- {
+ if (v.length < version_id.length) {
return false;
}
- for (int i = 0; i < version_id.length; i++)
- {
- if (version_id[i] != v[i])
- {
+ for (int i = 0; i < version_id.length; i++) {
+ if (version_id[i] != v[i]) {
return false;
}
}
return true;
}
- public static int getLength()
- {
+ public static int getLength() {
return version_id.length;
}
- public static short getVersionShort()
- {
+ public static short getVersionShort() {
return getVersionShort(version);
}
- public static short getVersionShort(String versionString)
- {
- if (versionString == null)
- {
+ public static short getVersionShort(String versionString) {
+ if (versionString == null) {
throw new IllegalArgumentException("versionString is null");
}
// Special cases for version prior to 1.2.4.SP2
- if ("1.2.4".equals(versionString))
- {
+ if ("1.2.4".equals(versionString)) {
return 124;
- }
- else if ("1.2.4.SP1".equals(versionString))
- {
+ } else if ("1.2.4.SP1".equals(versionString)) {
return 1241;
}
@@ -144,30 +123,24 @@
int a = 0;
int b = 0;
int c = 0;
- if (parts.length > 0)
- {
+ if (parts.length > 0) {
a = Integer.parseInt(parts[0]);
}
- if (parts.length > 1)
- {
+ if (parts.length > 1) {
b = Integer.parseInt(parts[1]);
}
- if (parts.length > 2)
- {
+ if (parts.length > 2) {
c = Integer.parseInt(parts[2]);
}
return encodeVersion(a, b, c);
}
- public static String getVersionString(short versionShort)
- {
- if (versionShort == SHORT_1_2_4_SP2)
- {
+ public static String getVersionString(short versionShort) {
+ if (versionShort == SHORT_1_2_4_SP2) {
return "1.2.4.SP2";
}
- switch (versionShort)
- {
+ switch (versionShort) {
case 124:
return "1.2.4";
case 1241:
@@ -177,33 +150,28 @@
}
}
- public static short encodeVersion(int major, int minor, int patch)
- {
+ public static short encodeVersion(int major, int minor, int patch) {
return (short) ((major << MAJOR_SHIFT)
+ (minor << MINOR_SHIFT)
+ patch);
}
- public static String decodeVersion(short version)
- {
+ public static String decodeVersion(short version) {
int major = (version & MAJOR_MASK) >> MAJOR_SHIFT;
int minor = (version & MINOR_MASK) >> MINOR_SHIFT;
int patch = (version & PATCH_MASK);
return major + "." + minor + "." + patch;
}
- public static boolean isBefore124(short version)
- {
+ public static boolean isBefore124(short version) {
return (version > 1241 && version <= SHORT_1_2_3);
}
/**
* Retroweaver version info.
*/
- public static class Retro
- {
- public static void main(String[] args)
- {
+ public static class Retro {
+ public static void main(String[] args) {
System.out.println("\nVersion: \t" + version + " (Retroweaved for
JDK 1.4.x compatibility)");
System.out.println("Codename: \t" + codename);
//System.out.println("CVS: \t" + cvs);
Modified: core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/config/Configuration.java
===================================================================
--- core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/config/Configuration.java 2009-02-13
13:43:13 UTC (rev 7686)
+++ core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/config/Configuration.java 2009-02-13
14:43:12 UTC (rev 7687)
@@ -41,8 +41,7 @@
* @author <a href="mailto:manik AT jboss DOT org">Manik Surtani (manik
AT jboss DOT org)</a>
*/
@NonVolatile
-public class Configuration extends ConfigurationComponent
-{
+public class Configuration extends ConfigurationComponent {
private static final long serialVersionUID = 5553791890144997466L;
private Marshaller marshaller;
@@ -54,8 +53,7 @@
/**
* Behavior of the JVM shutdown hook registered by the cache
*/
- public static enum ShutdownHookBehavior
- {
+ public static enum ShutdownHookBehavior {
/**
* By default a shutdown hook is registered if no MBean server (apart from the JDK
default) is detected.
*/
@@ -73,8 +71,7 @@
/**
* Cache replication mode.
*/
- public static enum CacheMode
- {
+ public static enum CacheMode {
/**
* Data is not replicated.
*/
@@ -103,22 +100,18 @@
/**
* Returns true if the mode is invalidation, either sync or async.
*/
- public boolean isInvalidation()
- {
+ public boolean isInvalidation() {
return this == INVALIDATION_SYNC || this == INVALIDATION_ASYNC;
}
- public boolean isSynchronous()
- {
+ public boolean isSynchronous() {
return this == REPL_SYNC || this == INVALIDATION_SYNC || this == LOCAL;
}
}
- public static CacheMode legacyModeToCacheMode(int legacyMode)
- {
- switch (legacyMode)
- {
+ public static CacheMode legacyModeToCacheMode(int legacyMode) {
+ switch (legacyMode) {
case 1:
return CacheMode.LOCAL;
case 2:
@@ -138,8 +131,7 @@
/**
* Cache node locking scheme.
*/
- public static enum NodeLockingScheme
- {
+ public static enum NodeLockingScheme {
/**
* Data is locked using the MVCC locking scheme. This is the default locking
scheme in JBoss Cache 3.0.0.
*
@@ -149,7 +141,8 @@
/**
* Data is exclusively locked during modification.
*
- * @see <a
href="http://en.wikipedia.org/wiki/Concurrency_control">http...
(pessimistic)</a>
+ * @see <a
href="http://en.wikipedia.org/wiki/Concurrency_control">http...
+ * (pessimistic)</a>
*/
PESSIMISTIC,
/**
@@ -162,8 +155,7 @@
/**
* @return true if the node locking scheme uses versioning.
*/
- public boolean isVersionedScheme()
- {
+ public boolean isVersionedScheme() {
return this == OPTIMISTIC;
}
}
@@ -187,7 +179,6 @@
private boolean exposeManagementStatistics = true;
@Dynamic
private boolean fetchInMemoryState = true;
- private boolean nonBlockingStateTransfer = false;
private short replicationVersion = DEFAULT_REPLICATION_VERSION;
@Dynamic
private long lockAcquisitionTimeout = 10000;
@@ -230,13 +221,10 @@
private int serializationExecutorQueueSize = 50000;
@Start(priority = 1)
- void correctIsolationLevels()
- {
+ void correctIsolationLevels() {
// ensure the correct isolation level upgrades and/or downgrades are performed.
- if (nodeLockingScheme == NodeLockingScheme.MVCC)
- {
- switch (isolationLevel)
- {
+ if (nodeLockingScheme == NodeLockingScheme.MVCC) {
+ switch (isolationLevel) {
case NONE:
case READ_UNCOMMITTED:
isolationLevel = IsolationLevel.READ_COMMITTED;
@@ -252,34 +240,28 @@
// SETTERS - MAKE SURE ALL SETTERS PERFORM testImmutability()!!!
//
------------------------------------------------------------------------------------------------------------
- public void setCacheMarshaller(Marshaller instance)
- {
+ public void setCacheMarshaller(Marshaller instance) {
marshaller = instance;
}
- public Marshaller getMarshaller()
- {
+ public Marshaller getMarshaller() {
return marshaller;
}
- public boolean isWriteSkewCheck()
- {
+ public boolean isWriteSkewCheck() {
return writeSkewCheck;
}
- public void setWriteSkewCheck(boolean writeSkewCheck)
- {
+ public void setWriteSkewCheck(boolean writeSkewCheck) {
testImmutability("writeSkewCheck");
this.writeSkewCheck = writeSkewCheck;
}
- public int getConcurrencyLevel()
- {
+ public int getConcurrencyLevel() {
return concurrencyLevel;
}
- public void setConcurrencyLevel(int concurrencyLevel)
- {
+ public void setConcurrencyLevel(int concurrencyLevel) {
testImmutability("concurrencyLevel");
this.concurrencyLevel = concurrencyLevel;
}
@@ -287,177 +269,152 @@
/**
* Converts a list of elements to a Java Groups property string.
*/
- public void setClusterConfig(Element config)
- {
+ public void setClusterConfig(Element config) {
setClusterConfig(jGroupsStackParser.parseClusterConfigXml(config));
}
- public void setClusterName(String clusterName)
- {
+ public void setClusterName(String clusterName) {
testImmutability("clusterName");
this.clusterName = clusterName;
}
- public void setClusterConfig(String clusterConfig)
- {
+ public void setClusterConfig(String clusterConfig) {
testImmutability("clusterConfig");
this.clusterConfig = clusterConfig;
}
- public void setReplQueueMaxElements(int replQueueMaxElements)
- {
+ public void setReplQueueMaxElements(int replQueueMaxElements) {
testImmutability("replQueueMaxElements");
this.replQueueMaxElements = replQueueMaxElements;
}
- public void setReplQueueInterval(long replQueueInterval)
- {
+ public void setReplQueueInterval(long replQueueInterval) {
testImmutability("replQueueInterval");
this.replQueueInterval = replQueueInterval;
}
- public void setExposeManagementStatistics(boolean useMbean)
- {
+ public void setExposeManagementStatistics(boolean useMbean) {
testImmutability("exposeManagementStatistics");
this.exposeManagementStatistics = useMbean;
}
/**
- * Enables invocation batching if set to <tt>true</tt>. You still need to
use {@link org.jboss.cache.Cache#startBatch()}
- * and {@link org.jboss.cache.Cache#endBatch(boolean)} to demarcate the start and end
of batches.
+ * Enables invocation batching if set to <tt>true</tt>. You still need to
use {@link
+ * org.jboss.cache.Cache#startBatch()} and {@link
org.jboss.cache.Cache#endBatch(boolean)} to demarcate the start and
+ * end of batches.
*
* @param enabled if true, batching is enabled.
* @since 3.0
*/
- public void setInvocationBatchingEnabled(boolean enabled)
- {
+ public void setInvocationBatchingEnabled(boolean enabled) {
testImmutability("invocationBatchingEnabled");
this.invocationBatchingEnabled = enabled;
}
- public void setFetchInMemoryState(boolean fetchInMemoryState)
- {
+ public void setFetchInMemoryState(boolean fetchInMemoryState) {
testImmutability("fetchInMemoryState");
this.fetchInMemoryState = fetchInMemoryState;
}
- public void setReplicationVersion(short replicationVersion)
- {
+ public void setReplicationVersion(short replicationVersion) {
testImmutability("replicationVersion");
this.replicationVersion = replicationVersion;
}
- public void setReplVersionString(String replVersionString)
- {
+ public void setReplVersionString(String replVersionString) {
setReplicationVersion(replVersionString == null ? 0 :
Version.getVersionShort(replVersionString));
}
- public void setLockAcquisitionTimeout(long lockAcquisitionTimeout)
- {
+ public void setLockAcquisitionTimeout(long lockAcquisitionTimeout) {
testImmutability("lockAcquisitionTimeout");
this.lockAcquisitionTimeout = lockAcquisitionTimeout;
}
- public void setSyncReplTimeout(long syncReplTimeout)
- {
+ public void setSyncReplTimeout(long syncReplTimeout) {
testImmutability("syncReplTimeout");
this.syncReplTimeout = syncReplTimeout;
}
- public void setCacheMode(CacheMode cacheModeInt)
- {
+ public void setCacheMode(CacheMode cacheModeInt) {
testImmutability("cacheMode");
this.cacheMode = cacheModeInt;
}
- public void setCacheMode(String cacheMode)
- {
+ public void setCacheMode(String cacheMode) {
testImmutability("cacheMode");
if (cacheMode == null) throw new ConfigurationException("Cache mode cannot be
null", "CacheMode");
this.cacheMode = CacheMode.valueOf(uc(cacheMode));
- if (this.cacheMode == null)
- {
+ if (this.cacheMode == null) {
log.warn("Unknown cache mode '" + cacheMode + "', using
defaults.");
this.cacheMode = CacheMode.LOCAL;
}
}
- public String getCacheModeString()
- {
+ public String getCacheModeString() {
return cacheMode == null ? null : cacheMode.toString();
}
- public void setCacheModeString(String cacheMode)
- {
+ public void setCacheModeString(String cacheMode) {
setCacheMode(cacheMode);
}
- public void setInactiveOnStartup(boolean inactiveOnStartup)
- {
+ public void setInactiveOnStartup(boolean inactiveOnStartup) {
testImmutability("inactiveOnStartup");
this.inactiveOnStartup = inactiveOnStartup;
}
- public EvictionConfig getEvictionConfig()
- {
+ public EvictionConfig getEvictionConfig() {
return evictionConfig;
}
- public void setEvictionConfig(EvictionConfig config)
- {
+ public void setEvictionConfig(EvictionConfig config) {
testImmutability("evictionConfig");
this.evictionConfig = config;
}
/**
- * This is a deprecated configuration option. While it will be supported for the 2.x
series for backward compatibility,
- * expect to see it disappear in 3.x.
+ * This is a deprecated configuration option. While it will be supported for the 2.x
series for backward
+ * compatibility, expect to see it disappear in 3.x.
* <p/>
- * With {@link #isUseLazyDeserialization()}, which is enabled by default, custom class
loaders are handled implicitly.
- * See the user guide for details on how this is handled.
+ * With {@link #isUseLazyDeserialization()}, which is enabled by default, custom class
loaders are handled
+ * implicitly. See the user guide for details on how this is handled.
* <p/>
*/
- public void setUseRegionBasedMarshalling(boolean useRegionBasedMarshalling)
- {
+ public void setUseRegionBasedMarshalling(boolean useRegionBasedMarshalling) {
testImmutability("useRegionBasedMarshalling");
this.useRegionBasedMarshalling = useRegionBasedMarshalling;
}
- public void setTransactionManagerLookupClass(String transactionManagerLookupClass)
- {
+ public void setTransactionManagerLookupClass(String transactionManagerLookupClass) {
testImmutability("transactionManagerLookupClass");
this.transactionManagerLookupClass = transactionManagerLookupClass;
}
- public void setCacheLoaderConfig(CacheLoaderConfig config)
- {
+ public void setCacheLoaderConfig(CacheLoaderConfig config) {
testImmutability("cacheLoaderConfig");
replaceChildConfig(this.cacheLoaderConfig, config);
this.cacheLoaderConfig = config;
}
- public void setSyncCommitPhase(boolean syncCommitPhase)
- {
+ public void setSyncCommitPhase(boolean syncCommitPhase) {
testImmutability("syncCommitPhase");
this.syncCommitPhase = syncCommitPhase;
}
- public void setSyncRollbackPhase(boolean syncRollbackPhase)
- {
+ public void setSyncRollbackPhase(boolean syncRollbackPhase) {
testImmutability("syncRollbackPhase");
this.syncRollbackPhase = syncRollbackPhase;
}
/**
* Sets the size of the asynchronous listener notification thread pool size. Defaults
to 1, and if set to below 1,
- * all async listeners (specified with {@link
org.jboss.cache.notifications.annotation.CacheListener#sync()} are notified
- * synchronously.
+ * all async listeners (specified with {@link
org.jboss.cache.notifications.annotation.CacheListener#sync()} are
+ * notified synchronously.
*
* @param listenerAsyncPoolSize number of threads in pool
* @since 3.0
*/
- public void setListenerAsyncPoolSize(int listenerAsyncPoolSize)
- {
+ public void setListenerAsyncPoolSize(int listenerAsyncPoolSize) {
testImmutability("listenerAsyncPoolSize");
this.listenerAsyncPoolSize = listenerAsyncPoolSize;
}
@@ -467,8 +424,7 @@
*
* @param listenerAsyncQueueSize queue size to use
*/
- public void setListenerAsyncQueueSize(int listenerAsyncQueueSize)
- {
+ public void setListenerAsyncQueueSize(int listenerAsyncQueueSize) {
testImmutability("listenerAsyncQueueSize");
this.listenerAsyncQueueSize = listenerAsyncQueueSize;
}
@@ -478,163 +434,138 @@
*
* @param serializationExecutorQueueSize queue size to use
*/
- public void setSerializationExecutorQueueSize(int serializationExecutorQueueSize)
- {
+ public void setSerializationExecutorQueueSize(int serializationExecutorQueueSize) {
testImmutability("serializationExecutorQueueSize");
this.serializationExecutorQueueSize = serializationExecutorQueueSize;
}
- public void setBuddyReplicationConfig(BuddyReplicationConfig config)
- {
+ public void setBuddyReplicationConfig(BuddyReplicationConfig config) {
testImmutability("buddyReplicationConfig");
replaceChildConfig(this.buddyReplicationConfig, config);
this.buddyReplicationConfig = config;
}
/**
- * @deprecated will default to MVCC once optimistic and pessimistic schemes are
removed.
* @param nodeLockingScheme
+ * @deprecated will default to MVCC once optimistic and pessimistic schemes are
removed.
*/
@Deprecated
- public void setNodeLockingScheme(NodeLockingScheme nodeLockingScheme)
- {
+ public void setNodeLockingScheme(NodeLockingScheme nodeLockingScheme) {
testImmutability("nodeLockingScheme");
testImmutability("nodeLockingOptimistic");
this.nodeLockingScheme = nodeLockingScheme;
}
- public void setUseReplQueue(boolean useReplQueue)
- {
+ public void setUseReplQueue(boolean useReplQueue) {
testImmutability("useReplQueue");
this.useReplQueue = useReplQueue;
}
- public void setIsolationLevel(IsolationLevel isolationLevel)
- {
+ public void setIsolationLevel(IsolationLevel isolationLevel) {
testImmutability("isolationLevel");
this.isolationLevel = isolationLevel;
}
/**
- * Starting with 3.x there are 3 locking schemes, so if true is passed in then state
is not defined.
- * It is here for backward compatibility reasons only and should not be used by new
code.
+ * Starting with 3.x there are 3 locking schemes, so if true is passed in then state
is not defined. It is here for
+ * backward compatibility reasons only and should not be used by new code.
*/
@Deprecated
- public void setNodeLockingOptimistic(boolean nodeLockingOptimistic)
- {
+ public void setNodeLockingOptimistic(boolean nodeLockingOptimistic) {
testImmutability("nodeLockingOptimistic");
if (nodeLockingOptimistic) setNodeLockingScheme(NodeLockingScheme.OPTIMISTIC);
else setNodeLockingScheme(NodeLockingScheme.PESSIMISTIC);
}
- public void setStateRetrievalTimeout(long stateRetrievalTimeout)
- {
+ public void setStateRetrievalTimeout(long stateRetrievalTimeout) {
testImmutability("stateRetrievalTimeout");
this.stateRetrievalTimeout = stateRetrievalTimeout;
}
- public void setNodeLockingScheme(String nodeLockingScheme)
- {
+ public void setNodeLockingScheme(String nodeLockingScheme) {
testImmutability("nodeLockingScheme");
- if (nodeLockingScheme == null)
- {
+ if (nodeLockingScheme == null) {
throw new ConfigurationException("Node locking scheme cannot be null",
"NodeLockingScheme");
}
this.nodeLockingScheme = NodeLockingScheme.valueOf(uc(nodeLockingScheme));
- if (this.nodeLockingScheme == null)
- {
+ if (this.nodeLockingScheme == null) {
log.warn("Unknown node locking scheme '" + nodeLockingScheme +
"', using defaults.");
this.nodeLockingScheme = NodeLockingScheme.PESSIMISTIC;
}
}
- public String getNodeLockingSchemeString()
- {
+ public String getNodeLockingSchemeString() {
return nodeLockingScheme == null ? null : nodeLockingScheme.toString();
}
- public void setNodeLockingSchemeString(String nodeLockingScheme)
- {
+ public void setNodeLockingSchemeString(String nodeLockingScheme) {
setNodeLockingScheme(nodeLockingScheme);
}
- private static String uc(String s)
- {
+ private static String uc(String s) {
return s.toUpperCase(Locale.ENGLISH);
}
- public void setIsolationLevel(String isolationLevel)
- {
+ public void setIsolationLevel(String isolationLevel) {
testImmutability("isolationLevel");
if (isolationLevel == null) throw new ConfigurationException("Isolation level
cannot be null", "IsolationLevel");
this.isolationLevel = IsolationLevel.valueOf(uc(isolationLevel));
- if (this.isolationLevel == null)
- {
+ if (this.isolationLevel == null) {
log.warn("Unknown isolation level '" + isolationLevel +
"', using defaults.");
this.isolationLevel = IsolationLevel.REPEATABLE_READ;
}
}
- public String getIsolationLevelString()
- {
+ public String getIsolationLevelString() {
return isolationLevel == null ? null : isolationLevel.toString();
}
- public void setIsolationLevelString(String isolationLevel)
- {
+ public void setIsolationLevelString(String isolationLevel) {
setIsolationLevel(isolationLevel);
}
/**
- * Sets whether inserting or removing a node requires a write lock
- * on the node's parent (when pessimistic locking is used.)
+ * Sets whether inserting or removing a node requires a write lock on the node's
parent (when pessimistic locking is
+ * used.)
* <p/>
* The default value is <code>false</code>
*/
- public void setLockParentForChildInsertRemove(boolean lockParentForChildInsertRemove)
- {
+ public void setLockParentForChildInsertRemove(boolean lockParentForChildInsertRemove)
{
testImmutability("lockParentForChildInsertRemove");
this.lockParentForChildInsertRemove = lockParentForChildInsertRemove;
}
- public void setMultiplexerStack(String stackName)
- {
+ public void setMultiplexerStack(String stackName) {
testImmutability("muxStackName");
this.muxStackName = stackName;
}
- public boolean isUsingMultiplexer()
- {
+ public boolean isUsingMultiplexer() {
return usingMultiplexer;
}
- public void setUsingMultiplexer(boolean usingMultiplexer)
- {
+ public void setUsingMultiplexer(boolean usingMultiplexer) {
testImmutability("usingMultiplexer");
this.usingMultiplexer = usingMultiplexer;
}
- public void setShutdownHookBehavior(ShutdownHookBehavior shutdownHookBehavior)
- {
+ public void setShutdownHookBehavior(ShutdownHookBehavior shutdownHookBehavior) {
testImmutability("shutdownHookBehavior");
this.shutdownHookBehavior = shutdownHookBehavior;
}
- public void setShutdownHookBehavior(String shutdownHookBehavior)
- {
+ public void setShutdownHookBehavior(String shutdownHookBehavior) {
testImmutability("shutdownHookBehavior");
if (shutdownHookBehavior == null)
throw new ConfigurationException("Shutdown hook behavior cannot be
null", "ShutdownHookBehavior");
this.shutdownHookBehavior =
ShutdownHookBehavior.valueOf(uc(shutdownHookBehavior));
- if (this.shutdownHookBehavior == null)
- {
+ if (this.shutdownHookBehavior == null) {
log.warn("Unknown shutdown hook behavior '" + shutdownHookBehavior
+ "', using defaults.");
this.shutdownHookBehavior = ShutdownHookBehavior.DEFAULT;
}
}
- public void setUseLazyDeserialization(boolean useLazyDeserialization)
- {
+ public void setUseLazyDeserialization(boolean useLazyDeserialization) {
testImmutability("useLazyDeserialization");
this.useLazyDeserialization = useLazyDeserialization;
}
@@ -645,8 +576,7 @@
* @param objectInputStreamPoolSize
* @since 2.1.0
*/
- public void setObjectInputStreamPoolSize(int objectInputStreamPoolSize)
- {
+ public void setObjectInputStreamPoolSize(int objectInputStreamPoolSize) {
testImmutability("objectInputStreamPoolSize");
this.objectInputStreamPoolSize = objectInputStreamPoolSize;
}
@@ -657,8 +587,7 @@
* @param objectOutputStreamPoolSize
* @since 2.1.0
*/
- public void setObjectOutputStreamPoolSize(int objectOutputStreamPoolSize)
- {
+ public void setObjectOutputStreamPoolSize(int objectOutputStreamPoolSize) {
testImmutability("objectOutputStreamPoolSize");
this.objectOutputStreamPoolSize = objectOutputStreamPoolSize;
}
@@ -669,8 +598,7 @@
*
* @param serializationExecutorPoolSize number of threads to use
*/
- public void setSerializationExecutorPoolSize(int serializationExecutorPoolSize)
- {
+ public void setSerializationExecutorPoolSize(int serializationExecutorPoolSize) {
testImmutability("serializationExecutorPoolSize");
this.serializationExecutorPoolSize = serializationExecutorPoolSize;
}
@@ -680,8 +608,7 @@
//
------------------------------------------------------------------------------------------------------------
- public ShutdownHookBehavior getShutdownHookBehavior()
- {
+ public ShutdownHookBehavior getShutdownHookBehavior() {
return this.shutdownHookBehavior;
}
@@ -692,33 +619,27 @@
* @deprecated use {@link #getNodeLockingScheme()} to determine node locking scheme
used.
*/
@Deprecated
- public boolean isNodeLockingOptimistic()
- {
+ public boolean isNodeLockingOptimistic() {
return nodeLockingScheme == NodeLockingScheme.OPTIMISTIC;
}
- public boolean isUseReplQueue()
- {
+ public boolean isUseReplQueue() {
return useReplQueue;
}
- public String getClusterName()
- {
+ public String getClusterName() {
return clusterName;
}
- public String getClusterConfig()
- {
+ public String getClusterConfig() {
return clusterConfig;
}
- public int getReplQueueMaxElements()
- {
+ public int getReplQueueMaxElements() {
return replQueueMaxElements;
}
- public long getReplQueueInterval()
- {
+ public long getReplQueueInterval() {
return replQueueInterval;
}
@@ -726,13 +647,11 @@
* @deprecated use isExposeManagementStatistics()
*/
@Deprecated
- public boolean getExposeManagementStatistics()
- {
+ public boolean getExposeManagementStatistics() {
return exposeManagementStatistics;
}
- public boolean isExposeManagementStatistics()
- {
+ public boolean isExposeManagementStatistics() {
return exposeManagementStatistics;
}
@@ -740,160 +659,132 @@
* @return true if invocation batching is enabled.
* @since 3.0
*/
- public boolean isInvocationBatchingEnabled()
- {
+ public boolean isInvocationBatchingEnabled() {
return invocationBatchingEnabled;
}
- public boolean isFetchInMemoryState()
- {
+ public boolean isFetchInMemoryState() {
return fetchInMemoryState;
}
- public short getReplicationVersion()
- {
+ public short getReplicationVersion() {
return replicationVersion;
}
- public String getReplVersionString()
- {
+ public String getReplVersionString() {
return Version.getVersionString(replicationVersion);
}
- public long getLockAcquisitionTimeout()
- {
+ public long getLockAcquisitionTimeout() {
return lockAcquisitionTimeout;
}
- public long getSyncReplTimeout()
- {
+ public long getSyncReplTimeout() {
return syncReplTimeout;
}
- public CacheMode getCacheMode()
- {
+ public CacheMode getCacheMode() {
return cacheMode;
}
- public boolean isInactiveOnStartup()
- {
+ public boolean isInactiveOnStartup() {
return inactiveOnStartup;
}
- public IsolationLevel getIsolationLevel()
- {
+ public IsolationLevel getIsolationLevel() {
return isolationLevel;
}
/**
- * Gets whether inserting or removing a node requires a write lock
- * on the node's parent (when pessimistic locking is used.)
+ * Gets whether inserting or removing a node requires a write lock on the node's
parent (when pessimistic locking is
+ * used.)
* <p/>
* The default value is <code>false</code>
*/
- public boolean isLockParentForChildInsertRemove()
- {
+ public boolean isLockParentForChildInsertRemove() {
return lockParentForChildInsertRemove;
}
- public boolean isUseRegionBasedMarshalling()
- {
+ public boolean isUseRegionBasedMarshalling() {
return useRegionBasedMarshalling;
}
- public String getTransactionManagerLookupClass()
- {
+ public String getTransactionManagerLookupClass() {
return transactionManagerLookupClass;
}
- public CacheLoaderConfig getCacheLoaderConfig()
- {
+ public CacheLoaderConfig getCacheLoaderConfig() {
return cacheLoaderConfig;
}
- public boolean isSyncCommitPhase()
- {
+ public boolean isSyncCommitPhase() {
return syncCommitPhase;
}
- public boolean isSyncRollbackPhase()
- {
+ public boolean isSyncRollbackPhase() {
return syncRollbackPhase;
}
/**
* Gets the size of the asynchronous listener notification thread pool size. Defaults
to 1, and if set to below 1,
- * all async listeners (specified with {@link
org.jboss.cache.notifications.annotation.CacheListener#sync()} are notified
- * synchronously.
+ * all async listeners (specified with {@link
org.jboss.cache.notifications.annotation.CacheListener#sync()} are
+ * notified synchronously.
*
* @return thread pool size
* @since 3.0
*/
- public int getListenerAsyncPoolSize()
- {
+ public int getListenerAsyncPoolSize() {
return listenerAsyncPoolSize;
}
- public BuddyReplicationConfig getBuddyReplicationConfig()
- {
+ public BuddyReplicationConfig getBuddyReplicationConfig() {
return buddyReplicationConfig;
}
/**
- * @deprecated will be removed once optimistic and pessimistic locking is removed.
* @return node locking scheme in use
+ * @deprecated will be removed once optimistic and pessimistic locking is removed.
*/
@Deprecated
- public NodeLockingScheme getNodeLockingScheme()
- {
+ public NodeLockingScheme getNodeLockingScheme() {
return nodeLockingScheme;
}
- public long getStateRetrievalTimeout()
- {
+ public long getStateRetrievalTimeout() {
return stateRetrievalTimeout;
}
- public String getMultiplexerStack()
- {
+ public String getMultiplexerStack() {
return muxStackName;
}
- public boolean isUseLazyDeserialization()
- {
+ public boolean isUseLazyDeserialization() {
return useLazyDeserialization;
}
- public synchronized RuntimeConfig getRuntimeConfig()
- {
- if (runtimeConfig == null)
- {
+ public synchronized RuntimeConfig getRuntimeConfig() {
+ if (runtimeConfig == null) {
setRuntimeConfig(new RuntimeConfig(), false);
}
return runtimeConfig;
}
- public void setRuntimeConfig(RuntimeConfig runtimeConfig)
- {
+ public void setRuntimeConfig(RuntimeConfig runtimeConfig) {
setRuntimeConfig(runtimeConfig, true);
}
- private void setRuntimeConfig(RuntimeConfig runtimeConfig, boolean testImmutability)
- {
- if (testImmutability)
- {
+ private void setRuntimeConfig(RuntimeConfig runtimeConfig, boolean testImmutability)
{
+ if (testImmutability) {
testImmutability("runtimeConfig");
}
this.runtimeConfig = runtimeConfig;
}
- public String getMarshallerClass()
- {
+ public String getMarshallerClass() {
return marshallerClass;
}
- public void setMarshallerClass(String marshallerClass)
- {
+ public void setMarshallerClass(String marshallerClass) {
this.marshallerClass = marshallerClass;
}
@@ -901,8 +792,7 @@
* @return the size of he object input stream pool
* @since 2.1.0
*/
- public int getObjectInputStreamPoolSize()
- {
+ public int getObjectInputStreamPoolSize() {
return objectInputStreamPoolSize;
}
@@ -910,8 +800,7 @@
* @return the size of he object output stream pool
* @since 2.1.0
*/
- public int getObjectOutputStreamPoolSize()
- {
+ public int getObjectOutputStreamPoolSize() {
return objectOutputStreamPoolSize;
}
@@ -921,46 +810,38 @@
*
* @return a default JGroups config file
*/
- public URL getDefaultClusterConfig()
- {
+ public URL getDefaultClusterConfig() {
URL url = getClass().getClassLoader().getResource("flush-udp.xml");
if (log.isTraceEnabled()) log.trace("Using default JGroups configuration file
" + url);
return url;
}
- public URL getJGroupsConfigFile()
- {
+ public URL getJGroupsConfigFile() {
return jgroupsConfigFile;
}
- public void setJgroupsConfigFile(URL jgroupsConfigFile)
- {
+ public void setJgroupsConfigFile(URL jgroupsConfigFile) {
this.jgroupsConfigFile = jgroupsConfigFile;
}
/**
* @return the serialization executor pool size.
*/
- public int getSerializationExecutorPoolSize()
- {
+ public int getSerializationExecutorPoolSize() {
return serializationExecutorPoolSize;
}
/**
- *
* @return the bounded queue size for async listeners
*/
- public int getListenerAsyncQueueSize()
- {
+ public int getListenerAsyncQueueSize() {
return listenerAsyncQueueSize;
}
/**
- *
* @return the bounded queue size for async serializers
*/
- public int getSerializationExecutorQueueSize()
- {
+ public int getSerializationExecutorQueueSize() {
return serializationExecutorQueueSize;
}
@@ -973,8 +854,7 @@
//
------------------------------------------------------------------------------------------------------------
@Override
- public boolean equals(Object o)
- {
+ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
@@ -1027,8 +907,7 @@
}
@Override
- public int hashCode()
- {
+ public int hashCode() {
int result;
result = (marshaller != null ? marshaller.hashCode() : 0);
result = 31 * result + (clusterName != null ? clusterName.hashCode() : 0);
@@ -1071,23 +950,18 @@
}
@Override
- public Configuration clone() throws CloneNotSupportedException
- {
+ public Configuration clone() throws CloneNotSupportedException {
Configuration c = (Configuration) super.clone();
- if (buddyReplicationConfig != null)
- {
+ if (buddyReplicationConfig != null) {
c.setBuddyReplicationConfig(buddyReplicationConfig.clone());
}
- if (evictionConfig != null)
- {
+ if (evictionConfig != null) {
c.setEvictionConfig(evictionConfig.clone());
}
- if (cacheLoaderConfig != null)
- {
+ if (cacheLoaderConfig != null) {
c.setCacheLoaderConfig(cacheLoaderConfig.clone());
}
- if (runtimeConfig != null)
- {
+ if (runtimeConfig != null) {
c.setRuntimeConfig(runtimeConfig.clone());
// always make sure we reset the runtime when cloning.
c.getRuntimeConfig().reset();
@@ -1095,24 +969,20 @@
return c;
}
- public boolean isUsingCacheLoaders()
- {
+ public boolean isUsingCacheLoaders() {
return getCacheLoaderConfig() != null &&
!getCacheLoaderConfig().getIndividualCacheLoaderConfigs().isEmpty();
}
- public boolean isUsingBuddyReplication()
- {
+ public boolean isUsingBuddyReplication() {
return getBuddyReplicationConfig() != null &&
getBuddyReplicationConfig().isEnabled() &&
getCacheMode() != Configuration.CacheMode.LOCAL;
}
- public String getMuxStackName()
- {
+ public String getMuxStackName() {
return muxStackName;
}
- public void setMuxStackName(String muxStackName)
- {
+ public void setMuxStackName(String muxStackName) {
this.muxStackName = muxStackName;
}
@@ -1123,33 +993,19 @@
* @return List of cutom interceptors, never null
*/
@SuppressWarnings("unchecked")
- public List<CustomInterceptorConfig> getCustomInterceptors()
- {
+ public List<CustomInterceptorConfig> getCustomInterceptors() {
return customInterceptors == null ? Collections.EMPTY_LIST : customInterceptors;
}
/**
* @see #getCustomInterceptors()
*/
- public void setCustomInterceptors(List<CustomInterceptorConfig>
customInterceptors)
- {
+ public void setCustomInterceptors(List<CustomInterceptorConfig>
customInterceptors) {
testImmutability("customInterceptors");
this.customInterceptors = customInterceptors;
}
- public BuddyManager getConsistentHashing()
- {
+ public BuddyManager getConsistentHashing() {
return null;
}
-
- public boolean isNonBlockingStateTransfer()
- {
- return nonBlockingStateTransfer;
- }
-
- public void setNonBlockingStateTransfer(boolean nonBlockingStateTransfer)
- {
- this.nonBlockingStateTransfer = nonBlockingStateTransfer;
- }
-
}
Modified:
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/factories/ComponentRegistry.java
===================================================================
---
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/factories/ComponentRegistry.java 2009-02-13
13:43:13 UTC (rev 7686)
+++
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/factories/ComponentRegistry.java 2009-02-13
14:43:12 UTC (rev 7687)
@@ -51,36 +51,33 @@
import java.util.Set;
/**
- * A registry where components which have been created are stored. Components are stored
as singletons, registered under
- * a specific name.
+ * A registry where components which have been created are stored. Components are stored
as singletons, registered
+ * under a specific name.
* <p/>
* Components can be retrieved from the registry using {@link #getComponent(Class)}.
* <p/>
* Components can be registered using {@link #registerComponent(Object, Class)}, which
will cause any dependencies to be
- * wired in as well. Components that need to be created as a result of wiring will be
done using {@link #getOrCreateComponent(Class)},
- * which will look up the default factory for the component type (factories annotated
with the appropriate {@link DefaultFactoryFor} annotation.
+ * wired in as well. Components that need to be created as a result of wiring will be
done using {@link
+ * #getOrCreateComponent(Class)}, which will look up the default factory for the
component type (factories annotated
+ * with the appropriate {@link DefaultFactoryFor} annotation.
* <p/>
* Default factories are treated as components too and will need to be wired before being
used.
* <p/>
- * The registry can exist in one of several states, as defined by the {@link CacheStatus}
enumeration. In terms of the cache,
- * state changes in the following manner:
- * <ul>
- * <li>INSTANTIATED - when first constructed</li>
- * <li>CONSTRUCTED - when created using the DefaultCacheFactory</li>
- * <li>When {@link org.jboss.cache.Cache#create()} is called, the components are
rewired.</li>
- * <li>STARTED - when {@link org.jboss.cache.Cache#start()} is called</li>
- * <li>STOPPED - when {@link org.jboss.cache.Cache#stop()} is called</li>
- * <li>DESTROYED - when {@link org.jboss.cache.Cache#destroy()} is
called.</li>
- * </ul>
+ * The registry can exist in one of several states, as defined by the {@link CacheStatus}
enumeration. In terms of the
+ * cache, state changes in the following manner: <ul> <li>INSTANTIATED - when
first constructed</li> <li>CONSTRUCTED -
+ * when created using the DefaultCacheFactory</li> <li>When {@link
org.jboss.cache.Cache#create()} is called, the
+ * components are rewired.</li> <li>STARTED - when {@link
org.jboss.cache.Cache#start()} is called</li> <li>STOPPED -
+ * when {@link org.jboss.cache.Cache#stop()} is called</li> <li>DESTROYED -
when {@link org.jboss.cache.Cache#destroy()}
+ * is called.</li> </ul>
* <p/>
- * Cache configuration can only be changed and will only be reinjected if the cache is
not in the {@link org.jboss.cache.CacheStatus#STARTED} state.
+ * Cache configuration can only be changed and will only be reinjected if the cache is
not in the {@link
+ * org.jboss.cache.CacheStatus#STARTED} state.
*
* @author Manik Surtani (<a href="mailto:manik AT jboss DOT org">manik
AT jboss DOT org</a>)
* @since 2.1.0
*/
@NonVolatile
-public class ComponentRegistry implements Lifecycle
-{
+public class ComponentRegistry implements Lifecycle {
/**
* Contains class definitions of component factories that can be used to construct
certain components
*/
@@ -112,18 +109,15 @@
*
* @param configuration configuration with which this is created
*/
- public ComponentRegistry(Configuration configuration, CacheSPI cache)
- {
- try
- {
+ public ComponentRegistry(Configuration configuration, CacheSPI cache) {
+ try {
// bootstrap.
registerDefaultClassLoader(null);
registerComponent(this, ComponentRegistry.class);
registerComponent(configuration, Configuration.class);
registerComponent(new BootstrapFactory(cache, configuration, this),
BootstrapFactory.class);
}
- catch (Exception e)
- {
+ catch (Exception e) {
throw new CacheException("Unable to construct ComponentRegistry", e);
}
}
@@ -133,23 +127,21 @@
*
* @return state of the registry
*/
- public CacheStatus getState()
- {
+ public CacheStatus getState() {
return state;
}
/**
- * Wires an object instance with dependencies annotated with the {@link
org.jboss.cache.factories.annotations.Inject} annotation, creating more components
- * as needed based on the Configuration passed in if these additional components
don't exist in the
- * {@link ComponentRegistry}. Strictly for components that don't otherwise live
in the registry and have a lifecycle, such as Nodes.
+ * Wires an object instance with dependencies annotated with the {@link
org.jboss.cache.factories.annotations.Inject}
+ * annotation, creating more components as needed based on the Configuration passed in
if these additional components
+ * don't exist in the {@link ComponentRegistry}. Strictly for components that
don't otherwise live in the registry
+ * and have a lifecycle, such as Nodes.
*
* @param target object to wire
* @throws ConfigurationException if there is a problem wiring the instance
*/
- public void wireDependencies(Object target) throws ConfigurationException
- {
- try
- {
+ public void wireDependencies(Object target) throws ConfigurationException {
+ try {
// don't use the reflection cache for wireDependencies calls since these are
not managed by the ComponentRegistry
// and may be invoked at any time, even after the cache starts.
List<Method> methods = ReflectionUtil.getAllMethods(target.getClass(),
Inject.class);
@@ -157,8 +149,7 @@
// search for anything we need to inject
for (Method method : methods) invokeInjectionMethod(target, method);
}
- catch (Exception e)
- {
+ catch (Exception e) {
throw new ConfigurationException("Unable to configure component (type:
" + target.getClass() + ", instance " + target + ")", e);
}
}
@@ -168,26 +159,26 @@
* typically called by bootstrap code. Defensively, it is called in the constructor
of ComponentRegistry with a null
* parameter.
*
- * @param loader a class loader to use by default. If this is null, the class loader
used to load this instance of ComponentRegistry is used.
+ * @param loader a class loader to use by default. If this is null, the class loader
used to load this instance of
+ * ComponentRegistry is used.
*/
- public void registerDefaultClassLoader(ClassLoader loader)
- {
+ public void registerDefaultClassLoader(ClassLoader loader) {
registerComponent(loader == null ? getClass().getClassLoader() : loader,
ClassLoader.class);
// make sure the class loader is non-volatile, so it survives restarts.
componentLookup.get(ClassLoader.class.getName()).nonVolatile = true;
}
/**
- * This is hard coded for now, since scanning the classpath for factories annotated
with {@link org.jboss.cache.factories.annotations.DefaultFactoryFor}
- * does not work with all class loaders. This is a temporary solution until a more
elegant one can be designed.
+ * This is hard coded for now, since scanning the classpath for factories annotated
with {@link
+ * org.jboss.cache.factories.annotations.DefaultFactoryFor} does not work with all
class loaders. This is a
+ * temporary solution until a more elegant one can be designed.
* <p/>
* BE SURE TO ADD ANY NEW FACTORY TYPES ANNOTATED WITH DefaultFactoryFor TO THIS
SET!!
* <p/>
*
* @return set of known factory types.
*/
- private Set<Class<? extends ComponentFactory>> getHardcodedFactories()
- {
+ private Set<Class<? extends ComponentFactory>> getHardcodedFactories() {
Set<Class<? extends ComponentFactory>> s = new HashSet<Class<?
extends ComponentFactory>>();
s.add(BootstrapFactory.class);
s.add(BuddyManagerFactory.class);
@@ -204,7 +195,6 @@
s.add(RegionManagerFactory.class);
s.add(NodeMetaFactory.class);
s.add(CommandsMetaFactory.class);
- s.add(TransactionLogFactory.class);
return s;
}
@@ -215,16 +205,13 @@
* @param component component to register
* @param type type of component
*/
- public void registerComponent(Object component, Class type)
- {
+ public void registerComponent(Object component, Class type) {
String name = type.getName();
Component old = componentLookup.get(name);
- if (old != null && old.instance.equals(component))
- {
- if (trace)
- {
+ if (old != null && old.instance.equals(component)) {
+ if (trace) {
log.trace("Attempting to register a component equal to one that already
exists under the same name (" + name + "). Not doing anything.");
}
return;
@@ -232,17 +219,14 @@
Component c;
- if (old != null)
- {
+ if (old != null) {
if (trace) log.trace("Replacing old component " + old + " with
new instance " + component);
old.instance = component;
old.methodsScanned = false;
c = old;
if (state == CacheStatus.STARTED) populateLifecycleMethods();
- }
- else
- {
+ } else {
c = new Component();
c.name = name;
c.instance = component;
@@ -260,8 +244,7 @@
*
* @param c component to add dependencies to
*/
- protected void addComponentDependencies(Component c)
- {
+ protected void addComponentDependencies(Component c) {
Class type = c.instance.getClass();
List<Method> methods = ReflectionUtil.getAllMethods(type, Inject.class);
c.injectionMethods.clear();
@@ -269,13 +252,11 @@
}
@SuppressWarnings("unchecked")
- protected void invokeInjectionMethod(Object o, Method m)
- {
+ protected void invokeInjectionMethod(Object o, Method m) {
Class[] dependencies = m.getParameterTypes();
Object[] params = new Object[dependencies.length];
- for (int i = 0; i < dependencies.length; i++)
- {
+ for (int i = 0; i < dependencies.length; i++) {
params[i] = getOrCreateComponent(dependencies[i]);
}
@@ -284,35 +265,33 @@
/**
* Retrieves a component if one exists, and if not, attempts to find a factory capable
of constructing the component
- * (factories annotated with the {@link DefaultFactoryFor} annotation that is capable
of creating the component class).
+ * (factories annotated with the {@link DefaultFactoryFor} annotation that is capable
of creating the component
+ * class).
* <p/>
- * If an instance needs to be constructed, dependencies are then automatically wired
into the instance, based on methods
- * on the component type annotated with {@link Inject}.
+ * If an instance needs to be constructed, dependencies are then automatically wired
into the instance, based on
+ * methods on the component type annotated with {@link Inject}.
* <p/>
- * Summing it up, component retrieval happens in the following order:<br />
- * 1. Look for a component that has already been created and registered.
- * 2. Look for an appropriate component that exists in the {@link Configuration} that
may be injected from an external system.
- * 3. Look for a class definition passed in to the {@link
org.jboss.cache.config.Configuration} - such as an EvictionPolicy implementation
- * 4. Attempt to create it by looking for an appropriate factory (annotated with
{@link DefaultFactoryFor})
+ * Summing it up, component retrieval happens in the following order:<br /> 1.
Look for a component that has already
+ * been created and registered. 2. Look for an appropriate component that exists in
the {@link Configuration} that
+ * may be injected from an external system. 3. Look for a class definition passed in
to the {@link
+ * org.jboss.cache.config.Configuration} - such as an EvictionPolicy implementation 4.
Attempt to create it by
+ * looking for an appropriate factory (annotated with {@link DefaultFactoryFor})
* <p/>
*
* @param componentClass type of component to be retrieved. Should not be null.
* @return a fully wired component instance, or null if one cannot be found or
constructed.
* @throws ConfigurationException if there is a problem with consructing or wiring the
instance.
*/
- protected <T> T getOrCreateComponent(Class<T> componentClass)
- {
+ protected <T> T getOrCreateComponent(Class<T> componentClass) {
T component = getComponent(componentClass);
- if (component == null)
- {
+ if (component == null) {
// first see if this has been injected externally.
component = getFromConfiguration(componentClass);
boolean attemptedFactoryConstruction = false;
- if (component == null)
- {
+ if (component == null) {
// create this component and add it to the registry
ComponentFactory factory = getFactory(componentClass);
component = factory.construct(componentClass);
@@ -320,12 +299,9 @@
}
- if (component != null)
- {
+ if (component != null) {
registerComponent(component, componentClass);
- }
- else if (attemptedFactoryConstruction)
- {
+ } else if (attemptedFactoryConstruction) {
if (trace) log.trace("Registering a null for component " +
componentClass.getSimpleName());
registerNullComponent(componentClass);
}
@@ -335,28 +311,24 @@
}
/**
- * Retrieves a component factory instance capable of constructing components of a
specified type. If the factory doesn't
- * exist in the registry, one is created.
+ * Retrieves a component factory instance capable of constructing components of a
specified type. If the factory
+ * doesn't exist in the registry, one is created.
*
* @param componentClass type of component to construct
* @return component factory capable of constructing such components
*/
- protected ComponentFactory getFactory(Class componentClass)
- {
+ protected ComponentFactory getFactory(Class componentClass) {
if (defaultFactories == null) scanDefaultFactories();
Class<? extends ComponentFactory> cfClass =
defaultFactories.get(componentClass);
- if (cfClass == null)
- {
+ if (cfClass == null) {
throw new ConfigurationException("No registered default factory for
component " + componentClass + " found!");
}
// a component factory is a component too! See if one has been created and exists
in the registry
ComponentFactory cf = getComponent(cfClass);
- if (cf == null)
- {
+ if (cf == null) {
// hasn't yet been created. Create and put in registry
cf = instantiateFactory(cfClass);
- if (cf == null)
- {
+ if (cf == null) {
throw new ConfigurationException("Unable to locate component factory for
component " + componentClass);
}
// we simply register this factory. Registration will take care of constructing
any dependencies.
@@ -365,45 +337,39 @@
// ensure the component factory is in the STARTED state!
Component c = componentLookup.get(cfClass.getName());
- if (c.instance != cf)
- {
+ if (c.instance != cf) {
throw new ConfigurationException("Component factory " + cfClass +
" incorrectly registered!");
}
return cf;
}
/**
- * Scans the class path for classes annotated with {@link
org.jboss.cache.factories.annotations.DefaultFactoryFor}, and
- * analyses which components can be created by such factories.
+ * Scans the class path for classes annotated with {@link
org.jboss.cache.factories.annotations.DefaultFactoryFor},
+ * and analyses which components can be created by such factories.
*/
- void scanDefaultFactories()
- {
+ void scanDefaultFactories() {
defaultFactories = new HashMap<Class, Class<? extends
ComponentFactory>>();
Set<Class<? extends ComponentFactory>> factories =
getHardcodedFactories();
- for (Class<? extends ComponentFactory> factory : factories)
- {
+ for (Class<? extends ComponentFactory> factory : factories) {
DefaultFactoryFor dFFAnnotation =
factory.getAnnotation(DefaultFactoryFor.class);
for (Class targetClass : dFFAnnotation.classes())
defaultFactories.put(targetClass, factory);
}
}
/**
- * No such thing as a meta factory yet. Factories are created using this method which
attempts to use an empty public
- * constructor.
+ * No such thing as a meta factory yet. Factories are created using this method which
attempts to use an empty
+ * public constructor.
*
* @param factory class of factory to be created
* @return factory instance
*/
- ComponentFactory instantiateFactory(Class<? extends ComponentFactory> factory)
- {
- try
- {
+ ComponentFactory instantiateFactory(Class<? extends ComponentFactory> factory)
{
+ try {
return factory.newInstance();
}
- catch (Exception e)
- {
+ catch (Exception e) {
// unable to get a hold of an instance!!
throw new ConfigurationException("Unable to instantiate factory " +
factory, e);
}
@@ -414,8 +380,7 @@
*
* @param type type of component to register as a null
*/
- void registerNullComponent(Class type)
- {
+ void registerNullComponent(Class type) {
registerComponent(NULL_COMPONENT, type);
}
@@ -426,39 +391,30 @@
* @return component, or null if it cannot be found
*/
@SuppressWarnings("unchecked")
- protected <T> T getFromConfiguration(Class<T> componentClass)
- {
- if (log.isDebugEnabled())
- {
+ protected <T> T getFromConfiguration(Class<T> componentClass) {
+ if (log.isDebugEnabled()) {
log.debug("Looking in configuration for an instance of " +
componentClass + " that may have been injected from an external source.");
}
Method getter = BeanUtils.getterMethod(Configuration.class, componentClass);
T returnValue = null;
- if (getter != null)
- {
- try
- {
+ if (getter != null) {
+ try {
returnValue = (T) getter.invoke(getConfiguration());
}
- catch (Exception e)
- {
+ catch (Exception e) {
log.warn("Unable to invoke getter " + getter + " on
Configuration.class!", e);
}
}
// now try the RuntimeConfig - a legacy "registry" of sorts.
- if (returnValue == null)
- {
+ if (returnValue == null) {
getter = BeanUtils.getterMethod(RuntimeConfig.class, componentClass);
- if (getter != null)
- {
- try
- {
+ if (getter != null) {
+ try {
returnValue = (T) getter.invoke(getConfiguration().getRuntimeConfig());
}
- catch (Exception e)
- {
+ catch (Exception e) {
log.warn("Unable to invoke getter " + getter + " on
RuntimeConfig.class!", e);
}
}
@@ -471,8 +427,7 @@
*
* @return a Configuration object
*/
- protected Configuration getConfiguration()
- {
+ protected Configuration getConfiguration() {
// this is assumed to always be present as a part of the bootstrap/construction of
a ComponentRegistry.
return getComponent(Configuration.class);
}
@@ -484,8 +439,7 @@
* @return component, or null
*/
@SuppressWarnings("unchecked")
- public <T> T getComponent(Class<T> type)
- {
+ public <T> T getComponent(Class<T> type) {
Component wrapper = componentLookup.get(type.getName());
if (wrapper == null) return null;
@@ -495,11 +449,9 @@
/**
* Rewires components. Can only be called if the current state is WIRED or STARTED.
*/
- public void rewire()
- {
+ public void rewire() {
// need to re-inject everything again.
- for (Component c : new HashSet<Component>(componentLookup.values()))
- {
+ for (Component c : new HashSet<Component>(componentLookup.values())) {
// inject dependencies for this component
c.injectDependencies();
}
@@ -509,20 +461,16 @@
* Scans each registered component for lifecycle methods, and adds them to the
appropriate lists, and then sorts them
* by priority.
*/
- private void populateLifecycleMethods()
- {
- for (Component c : componentLookup.values())
- {
- if (!c.methodsScanned)
- {
+ private void populateLifecycleMethods() {
+ for (Component c : componentLookup.values()) {
+ if (!c.methodsScanned) {
c.methodsScanned = true;
c.startMethods.clear();
c.stopMethods.clear();
c.destroyMethods.clear();
List<Method> methods =
ReflectionUtil.getAllMethods(c.instance.getClass(), Start.class);
- for (Method m : methods)
- {
+ for (Method m : methods) {
PrioritizedMethod em = new PrioritizedMethod();
em.component = c;
em.method = m;
@@ -531,8 +479,7 @@
}
methods = ReflectionUtil.getAllMethods(c.instance.getClass(), Stop.class);
- for (Method m : methods)
- {
+ for (Method m : methods) {
PrioritizedMethod em = new PrioritizedMethod();
em.component = c;
em.method = m;
@@ -541,8 +488,7 @@
}
methods = ReflectionUtil.getAllMethods(c.instance.getClass(),
Destroy.class);
- for (Method m : methods)
- {
+ for (Method m : methods) {
PrioritizedMethod em = new PrioritizedMethod();
em.component = c;
em.method = m;
@@ -556,14 +502,11 @@
/**
* Removes any components not annotated as @NonVolatile.
*/
- public void resetNonVolatile()
- {
+ public void resetNonVolatile() {
// destroy all components to clean up resources
- for (Component c : new HashSet<Component>(componentLookup.values()))
- {
+ for (Component c : new HashSet<Component>(componentLookup.values())) {
// the component is volatile!!
- if (!c.nonVolatile)
- {
+ if (!c.nonVolatile) {
componentLookup.remove(c.name);
}
}
@@ -575,29 +518,22 @@
// These methods perform a check for appropriate transition and then delegate to
similarly named internal methods.
/**
- * Creates the components needed by a cache instance and sets the cache status to
{@link org.jboss.cache.CacheStatus#CREATED}
- * when it is done.
+ * Creates the components needed by a cache instance and sets the cache status to
{@link
+ * org.jboss.cache.CacheStatus#CREATED} when it is done.
*/
- public void create()
- {
- if (!state.createAllowed())
- {
- if (state.needToDestroyFailedCache())
- {
+ public void create() {
+ if (!state.createAllowed()) {
+ if (state.needToDestroyFailedCache()) {
destroy();
- }
- else
- {
+ } else {
return;
}
}
- try
- {
+ try {
internalCreate();
}
- catch (Throwable t)
- {
+ catch (Throwable t) {
handleLifecycleTransitionFailure(t);
}
}
@@ -606,103 +542,81 @@
* This starts the components in the cache, connecting to channels, starting service
threads, etc. If the cache is
* not in the {@link org.jboss.cache.CacheStatus#CREATED} state, {@link #create()}
will be invoked first.
*/
- public void start()
- {
+ public void start() {
boolean createdInStart = false;
- if (!state.startAllowed())
- {
- if (state.needToDestroyFailedCache())
- {
+ if (!state.startAllowed()) {
+ if (state.needToDestroyFailedCache()) {
destroy(); // this will take us back to DESTROYED
}
- if (state.needCreateBeforeStart())
- {
+ if (state.needCreateBeforeStart()) {
create();
createdInStart = true;
- }
- else
- {
+ } else {
return;
}
}
- try
- {
+ try {
internalStart(createdInStart);
}
- catch (Throwable t)
- {
+ catch (Throwable t) {
handleLifecycleTransitionFailure(t);
}
}
/**
- * Stops the cache and sets the cache status to {@link
org.jboss.cache.CacheStatus#STOPPED} once it is done. If the cache is not in
- * the {@link org.jboss.cache.CacheStatus#STARTED} state, this is a no-op.
+ * Stops the cache and sets the cache status to {@link
org.jboss.cache.CacheStatus#STOPPED} once it is done. If the
+ * cache is not in the {@link org.jboss.cache.CacheStatus#STARTED} state, this is a
no-op.
*/
- public void stop()
- {
- if (!state.stopAllowed())
- {
+ public void stop() {
+ if (!state.stopAllowed()) {
return;
}
// Trying to stop() from FAILED is valid, but may not work
boolean failed = state == CacheStatus.FAILED;
- try
- {
+ try {
internalStop();
}
- catch (Throwable t)
- {
- if (failed)
- {
+ catch (Throwable t) {
+ if (failed) {
log.warn("Attempted to stop() from FAILED state, but caught exception;
try calling destroy()", t);
}
failed = true;
handleLifecycleTransitionFailure(t);
}
- finally
- {
+ finally {
if (!failed) state = CacheStatus.STOPPED;
}
}
/**
- * Destroys the cache and frees up any resources. Sets the cache status to {@link
CacheStatus#DESTROYED} when it is done.
+ * Destroys the cache and frees up any resources. Sets the cache status to {@link
CacheStatus#DESTROYED} when it is
+ * done.
* <p/>
- * If the cache is in {@link org.jboss.cache.CacheStatus#STARTED} when this method is
called, it will first call {@link #stop()}
- * to stop the cache.
+ * If the cache is in {@link org.jboss.cache.CacheStatus#STARTED} when this method is
called, it will first call
+ * {@link #stop()} to stop the cache.
*/
- public void destroy()
- {
- if (!state.destroyAllowed())
- {
- if (state.needStopBeforeDestroy())
- {
- try
- {
+ public void destroy() {
+ if (!state.destroyAllowed()) {
+ if (state.needStopBeforeDestroy()) {
+ try {
stop();
}
- catch (CacheException e)
- {
+ catch (CacheException e) {
log.warn("Needed to call stop() before destroying but stop() threw
exception. Proceeding to destroy", e);
}
- }
- else
- {
+ } else {
return;
}
}
- try
- {
+ try {
internalDestroy();
}
- finally
- {
+ finally {
// We always progress to destroyed
state = CacheStatus.DESTROYED;
}
@@ -712,29 +626,20 @@
// ------------------------------ START: Actual internal lifecycle methods
--------------------------------
/**
- * Sets the cacheStatus to FAILED and rethrows the problem as one
- * of the declared types. Converts any non-RuntimeException Exception
- * to CacheException.
+ * Sets the cacheStatus to FAILED and rethrows the problem as one of the declared
types. Converts any
+ * non-RuntimeException Exception to CacheException.
*
* @param t throwable thrown during failure
*/
- private void handleLifecycleTransitionFailure(Throwable t)
- {
+ private void handleLifecycleTransitionFailure(Throwable t) {
state = CacheStatus.FAILED;
- if (t instanceof CacheException)
- {
+ if (t instanceof CacheException) {
throw (CacheException) t;
- }
- else if (t instanceof RuntimeException)
- {
+ } else if (t instanceof RuntimeException) {
throw (RuntimeException) t;
- }
- else if (t instanceof Error)
- {
+ } else if (t instanceof Error) {
throw (Error) t;
- }
- else
- {
+ } else {
throw new CacheException(t);
}
}
@@ -742,18 +647,15 @@
/**
* The actual create implementation.
*/
- private void internalCreate()
- {
+ private void internalCreate() {
state = CacheStatus.CREATING;
resetNonVolatile();
rewire();
state = CacheStatus.CREATED;
}
- private void internalStart(boolean createdInStart) throws CacheException,
IllegalArgumentException
- {
- if (!createdInStart)
- {
+ private void internalStart(boolean createdInStart) throws CacheException,
IllegalArgumentException {
+ if (!createdInStart) {
// re-wire all dependencies in case stuff has changed since the cache was
created
// remove any components whose construction may have depended upon a
configuration that may have changed.
resetNonVolatile();
@@ -783,41 +685,31 @@
state = CacheStatus.STARTED;
}
- private void addShutdownHook()
- {
+ private void addShutdownHook() {
ArrayList al = MBeanServerFactory.findMBeanServer(null);
boolean registerShutdownHook = (getConfiguration().getShutdownHookBehavior() ==
Configuration.ShutdownHookBehavior.DEFAULT && al.size() == 0)
|| getConfiguration().getShutdownHookBehavior() ==
Configuration.ShutdownHookBehavior.REGISTER;
- if (registerShutdownHook)
- {
- if (log.isTraceEnabled())
- {
+ if (registerShutdownHook) {
+ if (log.isTraceEnabled()) {
log.trace("Registering a shutdown hook. Configured behavior = " +
getConfiguration().getShutdownHookBehavior());
}
- shutdownHook = new Thread()
- {
+ shutdownHook = new Thread() {
@Override
- public void run()
- {
- try
- {
+ public void run() {
+ try {
invokedFromShutdownHook = true;
ComponentRegistry.this.stop();
}
- finally
- {
+ finally {
invokedFromShutdownHook = false;
}
}
};
Runtime.getRuntime().addShutdownHook(shutdownHook);
- }
- else
- {
- if (log.isTraceEnabled())
- {
+ } else {
+ if (log.isTraceEnabled()) {
log.trace("Not registering a shutdown hook. Configured behavior =
" + getConfiguration().getShutdownHookBehavior());
}
}
@@ -826,8 +718,7 @@
/**
* Actual stop
*/
- private void internalStop()
- {
+ private void internalStop() {
state = CacheStatus.STOPPING;
// if this is called from a source other than the shutdown hook, deregister the
shutdown hook.
if (!invokedFromShutdownHook && shutdownHook != null)
Runtime.getRuntime().removeShutdownHook(shutdownHook);
@@ -846,8 +737,7 @@
/**
* Actual destroy
*/
- private void internalDestroy()
- {
+ private void internalDestroy() {
state = CacheStatus.DESTROYING;
@@ -867,16 +757,16 @@
// ------------------------------ END: Actual internal lifecycle methods
--------------------------------
/**
- * Asserts whether invocations are allowed on the cache or not. Returns
<tt>true</tt> if invocations are to be allowed,
- * <tt>false</tt> otherwise. If the origin of the call is remote and the
cache status is {@link org.jboss.cache.CacheStatus#STARTING},
- * this method will block for up to {@link
org.jboss.cache.config.Configuration#getStateRetrievalTimeout()} millis, checking
- * for a valid state.
+ * Asserts whether invocations are allowed on the cache or not. Returns
<tt>true</tt> if invocations are to be
+ * allowed, <tt>false</tt> otherwise. If the origin of the call is remote
and the cache status is {@link
+ * org.jboss.cache.CacheStatus#STARTING}, this method will block for up to {@link
+ * org.jboss.cache.config.Configuration#getStateRetrievalTimeout()} millis, checking
for a valid state.
*
- * @param originLocal true if the call originates locally (i.e., from the {@link
org.jboss.cache.invocation.CacheInvocationDelegate} or false if it originates remotely,
i.e., from the {@link org.jboss.cache.marshall.CommandAwareRpcDispatcher}.
+ * @param originLocal true if the call originates locally (i.e., from the {@link
org.jboss.cache.invocation.CacheInvocationDelegate}
+ * or false if it originates remotely, i.e., from the {@link
org.jboss.cache.marshall.CommandAwareRpcDispatcher}.
* @return true if invocations are allowed, false otherwise.
*/
- public boolean invocationsAllowed(boolean originLocal)
- {
+ public boolean invocationsAllowed(boolean originLocal) {
log.trace("Testing if invocations are allowed.");
if (state.allowInvocations()) return true;
@@ -886,21 +776,16 @@
log.trace("Is remotely originating.");
// else if this is a remote call and the status is STARTING, wait until the cache
starts.
- if (state == CacheStatus.STARTING && blockInStarting)
- {
+ if (state == CacheStatus.STARTING && blockInStarting) {
log.trace("Cache is starting; block.");
- try
- {
+ try {
blockUntilCacheStarts();
return true;
}
- catch (InterruptedException e)
- {
+ catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
- }
- else if (blockInStarting)
- {
+ } else if (blockInStarting) {
log.warn("Received a remote call but the cache is not in STARTED state -
ignoring call.");
}
@@ -908,28 +793,25 @@
}
/**
- * Blocks until the current cache instance is in its {@link
org.jboss.cache.CacheStatus#STARTED started} phase. Blocks
- * for up to {@link org.jboss.cache.config.Configuration#getStateRetrievalTimeout()}
milliseconds, throwing an IllegalStateException
- * if the cache doesn't reach this state even after this maximum wait time.
+ * Blocks until the current cache instance is in its {@link
org.jboss.cache.CacheStatus#STARTED started} phase.
+ * Blocks for up to {@link
org.jboss.cache.config.Configuration#getStateRetrievalTimeout()} milliseconds, throwing
an
+ * IllegalStateException if the cache doesn't reach this state even after this
maximum wait time.
*
* @throws InterruptedException if interrupted while waiting
* @throws IllegalStateException if even after waiting the cache has not started.
*/
- private void blockUntilCacheStarts() throws InterruptedException,
IllegalStateException
- {
+ private void blockUntilCacheStarts() throws InterruptedException,
IllegalStateException {
int pollFrequencyMS = 100;
long startupWaitTime = getConfiguration().getStateRetrievalTimeout();
long giveUpTime = System.currentTimeMillis() + startupWaitTime;
- while (System.currentTimeMillis() < giveUpTime)
- {
+ while (System.currentTimeMillis() < giveUpTime) {
if (state.allowInvocations()) break;
Thread.sleep(pollFrequencyMS);
}
// check if we have started.
- if (!state.allowInvocations())
- {
+ if (!state.allowInvocations()) {
throw new IllegalStateException("Cache not in STARTED state, even after
waiting " + getConfiguration().getStateRetrievalTimeout() + " millis.");
}
}
@@ -937,8 +819,7 @@
/**
* A wrapper representing a component in the registry
*/
- public class Component
- {
+ public class Component {
/**
* A reference to the object instance for this component.
*/
@@ -961,8 +842,7 @@
boolean nonVolatile;
@Override
- public String toString()
- {
+ public String toString() {
return "Component{" +
"instance=" + instance +
", name=" + name +
@@ -973,18 +853,15 @@
/**
* Injects dependencies into this component.
*/
- public void injectDependencies()
- {
+ public void injectDependencies() {
for (Method m : injectionMethods) invokeInjectionMethod(instance, m);
}
- public Object getInstance()
- {
+ public Object getInstance() {
return instance;
}
- public String getName()
- {
+ public String getName() {
return name;
}
}
@@ -993,25 +870,21 @@
/**
* Wrapper to encapsulate a method along with a priority
*/
- static class PrioritizedMethod implements Comparable<PrioritizedMethod>
- {
+ static class PrioritizedMethod implements Comparable<PrioritizedMethod> {
Method method;
Component component;
int priority;
- public int compareTo(PrioritizedMethod o)
- {
+ public int compareTo(PrioritizedMethod o) {
return (priority < o.priority ? -1 : (priority == o.priority ? 0 : 1));
}
- void invoke()
- {
+ void invoke() {
ReflectionUtil.invokeAccessibly(component.instance, method, null);
}
@Override
- public String toString()
- {
+ public String toString() {
return "PrioritizedMethod{" +
"method=" + method +
", priority=" + priority +
@@ -1022,14 +895,12 @@
/**
* Returns an immutable set contating all the components that exists in the
reporsitory at this moment.
*/
- public Set<Component> getRegiteredComponents()
- {
+ public Set<Component> getRegiteredComponents() {
HashSet<Component> defensiveCopy = new
HashSet<Component>(componentLookup.values());
return Collections.unmodifiableSet(defensiveCopy);
}
- public void setBlockInStarting(boolean blockInStarting)
- {
+ public void setBlockInStarting(boolean blockInStarting) {
this.blockInStarting = blockInStarting;
}
}
Deleted:
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/factories/TransactionLogFactory.java
===================================================================
---
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/factories/TransactionLogFactory.java 2009-02-13
13:43:13 UTC (rev 7686)
+++
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/factories/TransactionLogFactory.java 2009-02-13
14:43:12 UTC (rev 7687)
@@ -1,40 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.jboss.cache.factories;
-
-import org.jboss.cache.factories.annotations.DefaultFactoryFor;
-import org.jboss.cache.transaction.TransactionLog;
-
-/**
- * Constructs {@link org.jboss.cache.transaction.TransactionLog} instances.
- *
- * @author Jason T. Greene
- * @since 3.0
- */
-@DefaultFactoryFor(classes = TransactionLog.class)
-public class TransactionLogFactory extends ComponentFactory
-{
- protected <T> T construct(Class<T> componentType)
- {
- return componentType.cast(new TransactionLog());
- }
-}
Modified:
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
===================================================================
---
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java 2009-02-13
13:43:13 UTC (rev 7686)
+++
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java 2009-02-13
14:43:12 UTC (rev 7687)
@@ -21,19 +21,6 @@
*/
package org.jboss.cache.interceptors;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.transaction.InvalidTransactionException;
-import javax.transaction.Status;
-import javax.transaction.Synchronization;
-import javax.transaction.SystemException;
-import javax.transaction.Transaction;
-
import org.jboss.cache.CacheException;
import org.jboss.cache.InvocationContext;
import org.jboss.cache.RPCManager;
@@ -64,29 +51,37 @@
import org.jboss.cache.notifications.Notifier;
import org.jboss.cache.transaction.GlobalTransaction;
import org.jboss.cache.transaction.TransactionContext;
-import org.jboss.cache.transaction.TransactionLog;
import org.jboss.cache.transaction.TransactionTable;
import org.jboss.cache.util.concurrent.ConcurrentHashSet;
+import javax.transaction.InvalidTransactionException;
+import javax.transaction.Status;
+import javax.transaction.Synchronization;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
/**
- * This interceptor is the new default at the head of all interceptor chains,
- * and makes transactional attributes available to all interceptors in the chain.
- * This interceptor is also responsible for registering for synchronisation on
- * transaction completion.
+ * This interceptor is the new default at the head of all interceptor chains, and makes
transactional attributes
+ * available to all interceptors in the chain. This interceptor is also responsible for
registering for synchronisation
+ * on transaction completion.
*
* @author <a href="mailto:manik AT jboss DOT org">Manik Surtani (manik
AT jboss DOT org)</a>
* @author <a href="mailto:stevew@jofti.com">Steve Woodcock
(stevew(a)jofti.com)</a>
* @author <a href="mailto:galder.zamarreno@jboss.com">Galder
Zamarreno</a>
*/
-public class TxInterceptor extends BaseTransactionalContextInterceptor
-{
+public class TxInterceptor extends BaseTransactionalContextInterceptor {
protected CommandsFactory commandsFactory;
protected RPCManager rpcManager;
private Notifier notifier;
private InvocationContextContainer invocationContextContainer;
private ComponentRegistry componentRegistry;
private ContextFactory contextFactory;
- private TransactionLog transactionLog;
/**
* List <Transaction>that we have registered for
@@ -104,44 +99,36 @@
@Inject
public void intialize(RPCManager rpcManager, ContextFactory contextFactory,
Notifier notifier, InvocationContextContainer icc,
- TransactionLog transactionLog, CommandsFactory factory,
- ComponentRegistry componentRegistry, LockManager lockManager)
- {
+ CommandsFactory factory,
+ ComponentRegistry componentRegistry, LockManager lockManager) {
this.contextFactory = contextFactory;
- this.transactionLog = transactionLog;
this.commandsFactory = factory;
this.rpcManager = rpcManager;
this.notifier = notifier;
this.invocationContextContainer = icc;
this.componentRegistry = componentRegistry;
this.lockManager = lockManager;
- setStatisticsEnabled(configuration.getExposeManagementStatistics());
+ setStatisticsEnabled(configuration.isExposeManagementStatistics());
}
@Override
- public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command)
throws Throwable
- {
+ public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command)
throws Throwable {
Object result = null;
// this is a prepare, commit, or rollback.
GlobalTransaction gtx = ctx.getGlobalTransaction();
if (trace) log.trace("Got gtx from invocation context " + gtx);
- try
- {
- if (gtx.isRemote())
- {
+ try {
+ if (gtx.isRemote()) {
result = handleRemotePrepare(ctx, command);
if (getStatisticsEnabled()) prepares++;
- }
- else
- {
+ } else {
if (trace) log.trace("received my own message (discarding it)");
result = null;
}
}
- catch (Throwable e)
- {
+ catch (Throwable e) {
ctx.throwIfNeeded(e);
}
@@ -149,27 +136,22 @@
}
@Override
- public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws
Throwable
- {
+ public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws
Throwable {
GlobalTransaction gtx = ctx.getGlobalTransaction();
- if (!ctx.getGlobalTransaction().isRemote())
- {
+ if (!ctx.getGlobalTransaction().isRemote()) {
if (trace) log.trace("received my own message (discarding it)");
return null;
}
- try
- {
+ try {
if (trace) log.trace("(" + rpcManager.getLocalAddress() + ") call
on command [" + command + "]");
Transaction ltx = txTable.getLocalTransaction(gtx, true);
// disconnect if we have a current tx associated
Transaction currentTx = txManager.getTransaction();
boolean resumeCurrentTxOnCompletion = false;
- try
- {
- if (!ltx.equals(currentTx))
- {
+ try {
+ if (!ltx.equals(currentTx)) {
currentTx = txManager.suspend();
resumeCurrentTxOnCompletion = true;
txManager.resume(ltx);
@@ -180,11 +162,9 @@
txManager.commit();
if (getStatisticsEnabled()) commits++;
}
- finally
- {
+ finally {
//resume the old transaction if we suspended it
- if (resumeCurrentTxOnCompletion)
- {
+ if (resumeCurrentTxOnCompletion) {
resumeTransactionOnCompletion(ctx, currentTx);
}
// remove from local lists.
@@ -194,8 +174,7 @@
}
if (trace) log.trace("Finished remote rollback method for " + gtx);
}
- catch (Throwable throwable)
- {
+ catch (Throwable throwable) {
ctx.throwIfNeeded(throwable);
}
@@ -203,21 +182,17 @@
}
@Override
- public Object visitRollbackCommand(InvocationContext ctx, RollbackCommand command)
throws Throwable
- {
+ public Object visitRollbackCommand(InvocationContext ctx, RollbackCommand command)
throws Throwable {
GlobalTransaction gtx = ctx.getGlobalTransaction();
- if (!gtx.isRemote())
- {
+ if (!gtx.isRemote()) {
if (trace) log.trace("received my own message (discarding it)");
return null;
}
- try
- {
+ try {
if (trace) log.trace("(" + rpcManager.getLocalAddress() + ") call
on command [" + command + "]");
Transaction ltx = txTable.getLocalTransaction(gtx);
- if (ltx == null)
- {
+ if (ltx == null) {
log.warn("No local transaction for this remotely originating rollback.
Possibly rolling back before a prepare call was broadcast?");
txTable.remove(gtx);
return null;
@@ -225,10 +200,8 @@
// disconnect if we have a current tx associated
Transaction currentTx = txManager.getTransaction();
boolean resumeCurrentTxOnCompletion = false;
- try
- {
- if (!ltx.equals(currentTx))
- {
+ try {
+ if (!ltx.equals(currentTx)) {
currentTx = txManager.suspend();
resumeCurrentTxOnCompletion = true;
txManager.resume(ltx);
@@ -239,11 +212,9 @@
txManager.rollback();
if (getStatisticsEnabled()) rollbacks++;
}
- finally
- {
+ finally {
//resume the old transaction if we suspended it
- if (resumeCurrentTxOnCompletion)
- {
+ if (resumeCurrentTxOnCompletion) {
resumeTransactionOnCompletion(ctx, currentTx);
}
@@ -255,8 +226,7 @@
}
if (trace) log.trace("Finished remote commit/rollback method for " +
gtx);
}
- catch (Throwable throwable)
- {
+ catch (Throwable throwable) {
ctx.throwIfNeeded(throwable);
}
@@ -264,38 +234,28 @@
}
@Override
- public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand command)
throws Throwable
- {
+ public Object visitInvalidateCommand(InvocationContext ctx, InvalidateCommand command)
throws Throwable {
return invokeNextInterceptor(ctx, command);
}
/**
- * Tests if we already have a tx running. If so, register a sync handler for this
method invocation.
- * if not, create a local tx if we're using opt locking.
+ * Tests if we already have a tx running. If so, register a sync handler for this
method invocation. if not, create
+ * a local tx if we're using opt locking.
*
* @throws Throwable
*/
@Override
- public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws
Throwable
- {
- try
- {
- Object ret = attachGtxAndPassUpChain(ctx, command);
-
- if (command instanceof WriteCommand && ctx.getTransaction() == null)
- transactionLog.logNoTxWrite((WriteCommand)command);
-
- return ret;
+ public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws
Throwable {
+ try {
+ return attachGtxAndPassUpChain(ctx, command);
}
- catch (Throwable throwable)
- {
+ catch (Throwable throwable) {
ctx.throwIfNeeded(throwable);
return null;
}
}
- protected Object attachGtxAndPassUpChain(InvocationContext ctx, VisitableCommand
command) throws Throwable
- {
+ protected Object attachGtxAndPassUpChain(InvocationContext ctx, VisitableCommand
command) throws Throwable {
Transaction tx = ctx.getTransaction();
if (tx != null) attachGlobalTransaction(ctx, tx, command);
return invokeNextInterceptor(ctx, command);
@@ -308,16 +268,15 @@
// --------------------------------------------------------------
/**
- * Handles a remotely originating prepare call, by creating a local transaction for
the remote global transaction
- * and replaying modifications in this new local transaction.
+ * Handles a remotely originating prepare call, by creating a local transaction for
the remote global transaction and
+ * replaying modifications in this new local transaction.
*
* @param ctx invocation context
* @param command prepare command
* @return result of the prepare, typically a null.
* @throws Throwable in the event of problems.
*/
- private Object handleRemotePrepare(InvocationContext ctx, PrepareCommand command)
throws Throwable
- {
+ private Object handleRemotePrepare(InvocationContext ctx, PrepareCommand command)
throws Throwable {
// the InvocationContextInterceptor would have set this for us
GlobalTransaction gtx = ctx.getGlobalTransaction();
@@ -328,10 +287,8 @@
Object retval = null;
boolean success = false;
- try
- {
- if (ltx == null)
- {
+ try {
+ if (ltx == null) {
if (currentTx != null) txManager.suspend();
// create a new local transaction
ltx = createLocalTx();
@@ -339,22 +296,17 @@
txTable.put(ltx, gtx);
if (trace) log.trace("Created new tx for gtx " + gtx);
- if (trace)
- {
+ if (trace) {
log.trace("Started new local tx as result of remote prepare: local
tx=" + ltx + " (status=" + ltx.getStatus() + "), gtx=" + gtx);
}
- }
- else
- {
+ } else {
//this should be valid
- if (!TransactionTable.isValid(ltx))
- {
+ if (!TransactionTable.isValid(ltx)) {
throw new CacheException("Transaction " + ltx + " not in
correct state to be prepared");
}
//associate this thread with the local transaction associated with the global
transaction, IF the localTx is NOT the current tx.
- if (currentTx == null || !ltx.equals(currentTx))
- {
+ if (currentTx == null || !ltx.equals(currentTx)) {
if (trace) log.trace("Suspending current tx " + currentTx);
txManager.suspend();
txManager.resume(ltx);
@@ -369,8 +321,7 @@
// below will need this transactionContext to add their modifications
// under the GlobalTx key
TransactionContext transactionContext = txTable.get(gtx);
- if (transactionContext == null)
- {
+ if (transactionContext == null) {
// create a new transaction transactionContext
if (trace) log.trace("creating new tx transactionContext");
transactionContext = contextFactory.createTransactionContext(ltx);
@@ -390,61 +341,44 @@
success = true; // no exceptions were thrown above!!
// now pass the prepare command up the chain as well.
- if (command.isOnePhaseCommit())
- {
- if (trace)
- {
+ if (command.isOnePhaseCommit()) {
+ if (trace) {
log.trace("Using one-phase prepare. Not propagating the prepare call
up the stack until called to do so by the sync handler.");
}
- }
- else
- {
- transactionLog.logPrepare(command);
-
+ } else {
// now pass up the prepare method itself.
invokeNextInterceptor(ctx, command);
}
// JBCACHE-361 Confirm that the transaction is ACTIVE
assertTxIsStillValid(ltx);
}
- finally
- {
+ finally {
// if we are running a one-phase commit, perform a commit or rollback now.
if (trace) log.trace("Are we running a 1-phase commit? " +
command.isOnePhaseCommit());
- if (command.isOnePhaseCommit())
- {
- try
- {
- if (success)
- {
+ if (command.isOnePhaseCommit()) {
+ try {
+ if (success) {
ltx.commit();
- }
- else
- {
+ } else {
ltx.rollback();
}
}
- catch (Throwable t)
- {
+ catch (Throwable t) {
log.error("Commit/rollback failed.", t);
- if (success)
- {
+ if (success) {
// try another rollback...
- try
- {
+ try {
log.info("Attempting anotehr rollback");
//invokeOnePhaseCommitMethod(globalTransaction, modifications.size()
> 0, false);
ltx.rollback();
}
- catch (Throwable t2)
- {
+ catch (Throwable t2) {
log.error("Unable to rollback", t2);
}
}
}
- finally
- {
+ finally {
transactions.remove(ltx);// JBAS-298
}
}
@@ -458,26 +392,20 @@
return retval;
}
- private ReplicableCommand attachGlobalTransaction(InvocationContext ctx, Transaction
tx, VisitableCommand command) throws Throwable
- {
- if (trace)
- {
+ private ReplicableCommand attachGlobalTransaction(InvocationContext ctx, Transaction
tx, VisitableCommand command) throws Throwable {
+ if (trace) {
log.trace(" local transaction exists - registering global tx if not present
for " + Thread.currentThread());
}
- if (trace)
- {
+ if (trace) {
GlobalTransaction tempGtx = txTable.get(tx);
log.trace("Associated gtx in txTable is " + tempGtx);
}
// register a sync handler for this tx - only if the globalTransaction is not
remotely initiated.
GlobalTransaction gtx = registerTransaction(tx, ctx);
- if (gtx != null)
- {
+ if (gtx != null) {
command = replaceGtx(command, gtx);
- }
- else
- {
+ } else {
// get the current globalTransaction from the txTable.
gtx = txTable.get(tx);
}
@@ -491,31 +419,25 @@
/**
* Replays modifications
*/
- protected void replayModifications(InvocationContext ctx, Transaction ltx,
PrepareCommand command) throws Throwable
- {
- try
- {
+ protected void replayModifications(InvocationContext ctx, Transaction ltx,
PrepareCommand command) throws Throwable {
+ try {
// replay modifications
- for (WriteCommand modification : command.getModifications())
- {
+ for (WriteCommand modification : command.getModifications()) {
invokeNextInterceptor(ctx, modification);
assertTxIsStillValid(ltx);
}
}
- catch (Throwable th)
- {
+ catch (Throwable th) {
log.error("prepare failed!", th);
throw th;
}
}
private void resumeTransactionOnCompletion(InvocationContext ctx, Transaction
currentTx)
- throws SystemException, InvalidTransactionException
- {
+ throws SystemException, InvalidTransactionException {
if (trace) log.trace("Resuming suspended transaction " + currentTx);
txManager.suspend();
- if (currentTx != null)
- {
+ if (currentTx != null) {
txManager.resume(currentTx);
ctx.setTransaction(currentTx);
}
@@ -528,18 +450,15 @@
* @throws Throwable
*/
@SuppressWarnings("deprecation")
- private Object handleCommitRollback(InvocationContext ctx, VisitableCommand command)
throws Throwable
- {
+ private Object handleCommitRollback(InvocationContext ctx, VisitableCommand command)
throws Throwable {
GlobalTransaction gtx = ctx.getGlobalTransaction();
Object result;
VisitableCommand originalCommand = ctx.getCommand();
ctx.setCommand(command);
- try
- {
+ try {
result = invokeNextInterceptor(ctx, command);
}
- finally
- {
+ finally {
ctx.setCommand(originalCommand);
ctx.setMethodCall(null);
}
@@ -551,65 +470,45 @@
// Transaction phase runners
// --------------------------------------------------------------
- protected PrepareCommand buildPrepareCommand(GlobalTransaction gtx, List
modifications, boolean onePhaseCommit)
- {
+ protected PrepareCommand buildPrepareCommand(GlobalTransaction gtx, List
modifications, boolean onePhaseCommit) {
return commandsFactory.buildPrepareCommand(gtx, modifications,
rpcManager.getLocalAddress(), onePhaseCommit);
}
/**
* creates a commit()
*/
- protected void runCommitPhase(InvocationContext ctx, GlobalTransaction gtx, List
modifications, boolean onePhaseCommit)
- {
- try
- {
+ protected void runCommitPhase(InvocationContext ctx, GlobalTransaction gtx, List
modifications, boolean onePhaseCommit) {
+ try {
if (trace) log.trace("Running commit for " + gtx);
VisitableCommand commitCommand = onePhaseCommit ? buildPrepareCommand(gtx,
modifications, true)
- :
commandsFactory.buildCommitCommand(gtx);
+ : commandsFactory.buildCommitCommand(gtx);
handleCommitRollback(ctx, commitCommand);
-
- if (onePhaseCommit)
- {
- transactionLog.logOnePhaseCommit(gtx, modifications);
- }
- else
- {
- transactionLog.logCommit(gtx);
- }
}
- catch (Throwable e)
- {
+ catch (Throwable e) {
log.warn("Commit failed. Clearing stale locks.");
- try
- {
+ try {
cleanupStaleLocks(ctx);
}
- catch (RuntimeException re)
- {
+ catch (RuntimeException re) {
log.error("Unable to clear stale locks", re);
throw re;
}
- catch (Throwable e2)
- {
+ catch (Throwable e2) {
log.error("Unable to clear stale locks", e2);
throw new RuntimeException(e2);
}
- if (e instanceof RuntimeException)
- {
+ if (e instanceof RuntimeException) {
throw (RuntimeException) e;
- }
- else
- {
+ } else {
throw new RuntimeException("Commit failed.", e);
}
}
}
- protected void cleanupStaleLocks(InvocationContext ctx) throws Throwable
- {
+ protected void cleanupStaleLocks(InvocationContext ctx) throws Throwable {
TransactionContext transactionContext = ctx.getTransactionContext();
if (transactionContext != null) lockManager.unlock(ctx);
}
@@ -617,36 +516,28 @@
/**
* creates a rollback()
*/
- protected void runRollbackPhase(InvocationContext ctx, GlobalTransaction gtx,
Transaction tx)
- {
- try
- {
+ protected void runRollbackPhase(InvocationContext ctx, GlobalTransaction gtx,
Transaction tx) {
+ try {
// JBCACHE-457
VisitableCommand rollbackCommand = commandsFactory.buildRollbackCommand(gtx);
if (trace) log.trace(" running rollback for " + gtx);
- transactionLog.rollback(gtx);
-
//JBCACHE-359 Store a lookup for the globalTransaction so a listener
// callback can find it
rollbackTransactions.put(tx, gtx);
handleCommitRollback(ctx, rollbackCommand);
}
- catch (Throwable e)
- {
+ catch (Throwable e) {
log.warn("Rollback had a problem", e);
}
- finally
- {
+ finally {
rollbackTransactions.remove(tx);
}
}
- private boolean isOnePhaseCommit()
- {
- if (!configuration.getCacheMode().isSynchronous() && !optimistic)
- {
+ private boolean isOnePhaseCommit() {
+ if (!configuration.getCacheMode().isSynchronous() && !optimistic) {
// this is a REPL_ASYNC call - do 1-phase commit. break!
if (trace) log.trace("This is a REPL_ASYNC call (1 phase commit) - do
nothing for beforeCompletion()");
return true;
@@ -659,13 +550,10 @@
* method call and passes the prepare() call up the chain.
*/
@SuppressWarnings("deprecation")
- public Object runPreparePhase(InvocationContext ctx, GlobalTransaction gtx,
List<WriteCommand> modifications) throws Throwable
- {
+ public Object runPreparePhase(InvocationContext ctx, GlobalTransaction gtx,
List<WriteCommand> modifications) throws Throwable {
// running a 2-phase commit.
PrepareCommand prepareCommand = buildPrepareCommand(gtx, modifications, false);
- transactionLog.logPrepare(prepareCommand);
-
Object result;
// Is there a local transaction associated with GTX ?
@@ -673,22 +561,17 @@
//if ltx is not null and it is already running
Transaction currentTransaction = txManager.getTransaction();
- if (currentTransaction != null && ltx != null &&
currentTransaction.equals(ltx))
- {
+ if (currentTransaction != null && ltx != null &&
currentTransaction.equals(ltx)) {
VisitableCommand originalCommand = ctx.getCommand();
ctx.setCommand(prepareCommand);
- try
- {
+ try {
result = invokeNextInterceptor(ctx, prepareCommand);
}
- finally
- {
+ finally {
ctx.setCommand(originalCommand);
ctx.setMethodCall(null);
}
- }
- else
- {
+ } else {
log.warn("Local transaction does not exist or does not match expected
transaction " + gtx);
throw new CacheException(" local transaction " + ltx + " does not
exist or does not match expected transaction " + gtx);
}
@@ -699,16 +582,12 @@
// Private helper methods
// --------------------------------------------------------------
- protected void assertTxIsStillValid(Transaction tx)
- {
- if (!TransactionTable.isActive(tx))
- {
- try
- {
+ protected void assertTxIsStillValid(Transaction tx) {
+ if (!TransactionTable.isActive(tx)) {
+ try {
throw new ReplicationException("prepare() failed -- local transaction
status is not STATUS_ACTIVE; is " + tx.getStatus());
}
- catch (SystemException e)
- {
+ catch (SystemException e) {
throw new ReplicationException("prepare() failed -- local transaction
status is not STATUS_ACTIVE; Unable to retrieve transaction status.");
}
}
@@ -717,52 +596,39 @@
/**
* Creates a gtx (if one doesnt exist), a sync handler, and registers the tx.
*/
- private GlobalTransaction registerTransaction(Transaction tx, InvocationContext ctx)
throws Exception
- {
+ private GlobalTransaction registerTransaction(Transaction tx, InvocationContext ctx)
throws Exception {
// we have ascertained that the current thread *is* associated with a transaction.
We need to make sure the
// transaction is in a valid state before moving on, and throwing an exception if
not.
boolean txValid = TransactionTable.isValid(tx);
- if (!txValid)
- {
+ if (!txValid) {
throw new IllegalStateException("Transaction " + tx + " is not in
a valid state to be invoking cache operations on.");
}
GlobalTransaction gtx;
- if (transactions.add(tx))
- {
+ if (transactions.add(tx)) {
gtx = txTable.getCurrentTransaction(tx, true);
TransactionContext transactionContext;
- if (ctx.getGlobalTransaction() == null)
- {
+ if (ctx.getGlobalTransaction() == null) {
ctx.setGlobalTransaction(gtx);
transactionContext = txTable.get(gtx);
ctx.setTransactionContext(transactionContext);
- }
- else
- {
+ } else {
transactionContext = ctx.getTransactionContext();
}
- if (gtx.isRemote())
- {
+ if (gtx.isRemote()) {
// should be no need to register a handler since this a remotely initiated
globalTransaction
if (trace) log.trace("is a remotely initiated gtx so no need to register
a tx for it");
- }
- else
- {
+ } else {
if (trace) log.trace("Registering sync handler for tx " + tx +
", gtx " + gtx);
// see the comment in the LocalSyncHandler for the last isOriginLocal param.
LocalSynchronizationHandler myHandler = new LocalSynchronizationHandler(gtx,
tx, transactionContext, !ctx.isOriginLocal());
registerHandler(tx, myHandler, ctx);
}
- }
- else if ((gtx = rollbackTransactions.get(tx)) != null)
- {
+ } else if ((gtx = rollbackTransactions.get(tx)) != null) {
if (trace) log.trace("Transaction " + tx + " is already
registered and is rolling back.");
- }
- else
- {
+ } else {
if (trace) log.trace("Transaction " + tx + " is already
registered.");
}
return gtx;
@@ -771,8 +637,7 @@
/**
* Registers a sync hander against a tx.
*/
- private void registerHandler(Transaction tx, Synchronization handler,
InvocationContext ctx) throws Exception
- {
+ private void registerHandler(Transaction tx, Synchronization handler,
InvocationContext ctx) throws Exception {
OrderedSynchronizationHandler orderedHandler =
ctx.getTransactionContext().getOrderedSynchronizationHandler();
//OrderedSynchronizationHandler.getInstance(tx);
if (trace) log.trace("registering for TX completion:
SynchronizationHandler(" + handler + ")");
@@ -785,69 +650,58 @@
/**
* Replaces the global transaction in a VisitableCommand with a new global transaction
passed in.
*/
- private VisitableCommand replaceGtx(VisitableCommand command, final GlobalTransaction
gtx) throws Throwable
- {
- command.acceptVisitor(null, new AbstractVisitor()
- {
+ private VisitableCommand replaceGtx(VisitableCommand command, final GlobalTransaction
gtx) throws Throwable {
+ command.acceptVisitor(null, new AbstractVisitor() {
@Override
- public Object visitPutDataMapCommand(InvocationContext ctx, PutDataMapCommand
command) throws Throwable
- {
+ public Object visitPutDataMapCommand(InvocationContext ctx, PutDataMapCommand
command) throws Throwable {
command.setGlobalTransaction(gtx);
return null;
}
@Override
- public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand
command) throws Throwable
- {
+ public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand
command) throws Throwable {
command.setGlobalTransaction(gtx);
return null;
}
@Override
- public Object visitClearDataCommand(InvocationContext ctx, ClearDataCommand
command) throws Throwable
- {
+ public Object visitClearDataCommand(InvocationContext ctx, ClearDataCommand
command) throws Throwable {
command.setGlobalTransaction(gtx);
return null;
}
@Override
- public Object visitRemoveNodeCommand(InvocationContext ctx, RemoveNodeCommand
command) throws Throwable
- {
+ public Object visitRemoveNodeCommand(InvocationContext ctx, RemoveNodeCommand
command) throws Throwable {
command.setGlobalTransaction(gtx);
return null;
}
@Override
- public Object visitRemoveKeyCommand(InvocationContext ctx, RemoveKeyCommand
command) throws Throwable
- {
+ public Object visitRemoveKeyCommand(InvocationContext ctx, RemoveKeyCommand
command) throws Throwable {
command.setGlobalTransaction(gtx);
return null;
}
@Override
- public Object visitCommitCommand(InvocationContext ctx, CommitCommand command)
throws Throwable
- {
+ public Object visitCommitCommand(InvocationContext ctx, CommitCommand command)
throws Throwable {
command.setGlobalTransaction(gtx);
return null;
}
@Override
- public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command)
throws Throwable
- {
+ public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command)
throws Throwable {
command.setGlobalTransaction(gtx);
return null;
}
@Override
- public Object visitRollbackCommand(InvocationContext ctx, RollbackCommand
command) throws Throwable
- {
+ public Object visitRollbackCommand(InvocationContext ctx, RollbackCommand
command) throws Throwable {
command.setGlobalTransaction(gtx);
return null;
}
@Override
- public Object visitOptimisticPrepareCommand(InvocationContext ctx,
OptimisticPrepareCommand command) throws Throwable
- {
+ public Object visitOptimisticPrepareCommand(InvocationContext ctx,
OptimisticPrepareCommand command) throws Throwable {
command.setGlobalTransaction(gtx);
return null;
}
@@ -860,10 +714,8 @@
*
* @throws Exception
*/
- protected Transaction createLocalTx() throws Exception
- {
- if (trace)
- {
+ protected Transaction createLocalTx() throws Exception {
+ if (trace) {
log.trace("Creating transaction for thread " +
Thread.currentThread());
}
Transaction localTx;
@@ -879,27 +731,23 @@
// this controls the whole transaction
- private class RemoteSynchronizationHandler implements Synchronization
- {
+ private class RemoteSynchronizationHandler implements Synchronization {
Transaction tx = null;
GlobalTransaction gtx = null;
List<WriteCommand> modifications = null;
TransactionContext transactionContext = null;
protected InvocationContext ctx; // the context for this call.
- RemoteSynchronizationHandler(GlobalTransaction gtx, Transaction tx,
TransactionContext entry)
- {
+ RemoteSynchronizationHandler(GlobalTransaction gtx, Transaction tx,
TransactionContext entry) {
this.gtx = gtx;
this.tx = tx;
this.transactionContext = entry;
}
- public void beforeCompletion()
- {
+ public void beforeCompletion() {
if (trace) log.trace("Running beforeCompletion on gtx " + gtx);
- if (transactionContext == null)
- {
+ if (transactionContext == null) {
log.error("Transaction has a null transaction entry - beforeCompletion()
will fail.");
throw new IllegalStateException("cannot find transaction entry for
" + gtx);
}
@@ -908,8 +756,7 @@
ctx = invocationContextContainer.get();
setTransactionalContext(tx, gtx, transactionContext, ctx);
- if (ctx.isOptionsUninitialised() && transactionContext.getOption() !=
null)
- {
+ if (ctx.isOptionsUninitialised() && transactionContext.getOption() !=
null) {
ctx.setOptionOverrides(transactionContext.getOption());
}
@@ -920,39 +767,32 @@
// this should really not be done here -
// it is supposed to be post commit not actually run the commit
- public void afterCompletion(int status)
- {
+ public void afterCompletion(int status) {
// could happen if a rollback is called and beforeCompletion() doesn't get
called.
- if (ctx == null)
- {
+ if (ctx == null) {
ctx = invocationContextContainer.get();
setTransactionalContext(tx, gtx, transactionContext, ctx);
- if (ctx.isOptionsUninitialised() && transactionContext != null
&& transactionContext.getOption() != null)
- {
+ if (ctx.isOptionsUninitialised() && transactionContext != null
&& transactionContext.getOption() != null) {
// use the options from the transaction entry instead
ctx.setOptionOverrides(transactionContext.getOption());
}
}
- try
- {
+ try {
assertCanContinue();
- try
- {
+ try {
if (txManager.getTransaction() != null &&
!txManager.getTransaction().equals(tx)) txManager.resume(tx);
}
- catch (Exception e)
- {
+ catch (Exception e) {
log.error("afterCompletion error: " + status, e);
}
if (trace) log.trace("calling aftercompletion for " + gtx);
// set any transaction wide options as current for this thread.
- if (transactionContext != null)
- {
+ if (transactionContext != null) {
// this should ideally be set in beforeCompletion(), after compacting the
list.
if (modifications == null) modifications =
transactionContext.getModifications();
boolean isSuppressEventNotification =
ctx.getOptionOverrides().isSuppressEventNotification();
@@ -961,8 +801,7 @@
}
if (tx != null) transactions.remove(tx);
- switch (status)
- {
+ switch (status) {
case Status.STATUS_COMMITTED:
boolean onePhaseCommit = isOnePhaseCommit();
if (trace) log.trace("Running commit phase. One phase? " +
onePhaseCommit);
@@ -982,13 +821,11 @@
throw new IllegalStateException("illegal status: " +
status);
}
}
- catch (Exception th)
- {
+ catch (Exception th) {
log.trace("Caught exception ", th);
}
- finally
- {
+ finally {
// clean up the tx table
txTable.remove(gtx);
txTable.remove(tx);
@@ -997,21 +834,18 @@
}
}
- private void assertCanContinue()
- {
- if (!componentRegistry.invocationsAllowed(true) &&
(ctx.getOptionOverrides() == null || !ctx.getOptionOverrides().isSkipCacheStatusCheck()))
- {
+ private void assertCanContinue() {
+ if (!componentRegistry.invocationsAllowed(true) &&
(ctx.getOptionOverrides() == null || !ctx.getOptionOverrides().isSkipCacheStatusCheck()))
{
throw new IllegalStateException("Cache not in STARTED state!");
}
}
/**
- * Cleans out (nullifies) member variables held by the sync object for easier gc.
Could be (falsely) seen as a mem
- * leak if the TM implementation hangs on to the synchronizations for an
unnecessarily long time even after the tx
- * completes. See JBCACHE-1007.
+ * Cleans out (nullifies) member variables held by the sync object for easier gc.
Could be (falsely) seen as a
+ * mem leak if the TM implementation hangs on to the synchronizations for an
unnecessarily long time even after
+ * the tx completes. See JBCACHE-1007.
*/
- private void cleanupInternalState()
- {
+ private void cleanupInternalState() {
tx = null;
gtx = null;
modifications = null;
@@ -1020,16 +854,13 @@
}
@Override
- public String toString()
- {
+ public String toString() {
return "TxInterceptor.RemoteSynchronizationHandler(gtx=" + gtx +
", tx=" + getTxAsString() + ")";
}
- protected String getTxAsString()
- {
+ protected String getTxAsString() {
// JBCACHE-1114 -- don't call toString() on tx or it can lead to stack
overflow
- if (tx == null)
- {
+ if (tx == null) {
return null;
}
@@ -1037,8 +868,7 @@
}
}
- private class LocalSynchronizationHandler extends RemoteSynchronizationHandler
- {
+ private class LocalSynchronizationHandler extends RemoteSynchronizationHandler {
private boolean localRollbackOnly = true;
// a VERY strange situation where a tx has remote origins, but since certain buddy
group org methods perform local
// cleanups even when remotely triggered, and optimistic locking is used, you end
up with an implicit local tx.
@@ -1052,35 +882,30 @@
* <p/>
* a VERY strange situation where a tx has remote origins, but since certain buddy
group org methods perform local
* cleanups even when remotely triggered, and optimistic locking is used, you end
up with an implicit local tx.
- * This is STILL remotely originating though and this needs to be made explicit
here.
- * this can be checked by inspecting the InvocationContext.isOriginLocal() at the
time of registering the sync.
+ * This is STILL remotely originating though and this needs to be made explicit
here. this can be checked by
+ * inspecting the InvocationContext.isOriginLocal() at the time of registering the
sync.
*
* @param gtx
* @param tx
* @param remoteLocal
*/
- LocalSynchronizationHandler(GlobalTransaction gtx, Transaction tx,
TransactionContext transactionContext, boolean remoteLocal)
- {
+ LocalSynchronizationHandler(GlobalTransaction gtx, Transaction tx,
TransactionContext transactionContext, boolean remoteLocal) {
super(gtx, tx, transactionContext);
this.remoteLocal = remoteLocal;
}
@Override
- public void beforeCompletion()
- {
+ public void beforeCompletion() {
super.beforeCompletion();
ctx.setOriginLocal(!remoteLocal); // this is the LOCAL sync handler after all!
// fetch the modifications before the transaction is committed
// (and thus removed from the txTable)
setTransactionalContext(tx, gtx, transactionContext, ctx);
- if (!transactionContext.hasModifications())
- {
+ if (!transactionContext.hasModifications()) {
if (trace) log.trace("No modifications in this tx. Skipping
beforeCompletion()");
modifications = Collections.emptyList();
return;
- }
- else
- {
+ } else {
modifications = transactionContext.getModifications();
}
@@ -1090,10 +915,8 @@
transactionalOptions.setSuppressEventNotification(originalOptions.isSuppressEventNotification());
ctx.setOptionOverrides(transactionalOptions);
- try
- {
- switch (tx.getStatus())
- {
+ try {
+ switch (tx.getStatus()) {
// if we are active or preparing then we can go ahead
case Status.STATUS_ACTIVE:
case Status.STATUS_PREPARING:
@@ -1101,10 +924,8 @@
Object result = isOnePhaseCommit() ? null : runPreparePhase(ctx, gtx,
modifications);
- if (result instanceof Throwable)
- {
- if (log.isDebugEnabled())
- {
+ if (result instanceof Throwable) {
+ if (log.isDebugEnabled()) {
log.debug("Transaction needs to be rolled back - the cache
returned an instance of Throwable for this prepare call (tx=" + tx + " and
gtx=" + gtx + ")", (Throwable) result);
}
tx.setRollbackOnly();
@@ -1115,28 +936,21 @@
throw new CacheException("transaction " + tx + " in
status " + tx.getStatus() + " unable to start transaction");
}
}
- catch (Throwable t)
- {
+ catch (Throwable t) {
if (log.isWarnEnabled()) log.warn("Caught exception, will now set
transaction to roll back", t);
- try
- {
+ try {
tx.setRollbackOnly();
}
- catch (SystemException se)
- {
+ catch (SystemException se) {
throw new RuntimeException("setting tx rollback failed ", se);
}
- if (t instanceof RuntimeException)
- {
+ if (t instanceof RuntimeException) {
throw (RuntimeException) t;
- }
- else
- {
+ } else {
throw new RuntimeException("", t);
}
}
- finally
- {
+ finally {
localRollbackOnly = false;
setTransactionalContext(null, null, null, ctx);
ctx.setOptionOverrides(originalOptions);
@@ -1144,41 +958,35 @@
}
@Override
- public void afterCompletion(int status)
- {
+ public void afterCompletion(int status) {
// could happen if a rollback is called and beforeCompletion() doesn't get
called.
if (ctx == null) ctx = invocationContextContainer.get();
ctx.setLocalRollbackOnly(localRollbackOnly);
setTransactionalContext(tx, gtx, transactionContext, ctx);
if (transactionalOptions != null) ctx.setOptionOverrides(transactionalOptions);
- try
- {
+ try {
super.afterCompletion(status);
}
- finally
- {
+ finally {
ctx.setOptionOverrides(originalOptions);
}
}
@Override
- public String toString()
- {
+ public String toString() {
return "TxInterceptor.LocalSynchronizationHandler(gtx=" + gtx +
", tx=" + getTxAsString() + ")";
}
}
@ManagedOperation
- public void resetStatistics()
- {
+ public void resetStatistics() {
prepares = 0;
commits = 0;
rollbacks = 0;
}
@ManagedOperation
- public Map<String, Object> dumpStatistics()
- {
+ public Map<String, Object> dumpStatistics() {
Map<String, Object> retval = new HashMap<String, Object>(3);
retval.put("Prepares", prepares);
retval.put("Commits", commits);
@@ -1187,38 +995,32 @@
}
@ManagedAttribute
- public boolean getStatisticsEnabled()
- {
+ public boolean getStatisticsEnabled() {
return this.statsEnabled;
}
@ManagedAttribute
- public void setStatisticsEnabled(boolean enabled)
- {
+ public void setStatisticsEnabled(boolean enabled) {
this.statsEnabled = enabled;
}
@ManagedAttribute(description = "number of transaction prepares")
- public long getPrepares()
- {
+ public long getPrepares() {
return prepares;
}
@ManagedAttribute(description = "number of transaction commits")
- public long getCommits()
- {
+ public long getCommits() {
return commits;
}
@ManagedAttribute(description = "number of transaction rollbacks")
- public long getRollbacks()
- {
+ public long getRollbacks() {
return rollbacks;
}
@ManagedAttribute(name = "numberOfSyncsRegistered", writable = false,
description = "number of transaction synchronizations currently registered")
- public int getNumberOfSyncsRegistered()
- {
+ public int getNumberOfSyncsRegistered() {
return transactions.size();
}
}
Modified:
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java
===================================================================
---
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java 2009-02-13
13:43:13 UTC (rev 7686)
+++
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java 2009-02-13
14:43:12 UTC (rev 7687)
@@ -21,24 +21,6 @@
*/
package org.jboss.cache.marshall;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.IdentityHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.TreeSet;
-
-import org.apache.commons.httpclient.cookie.IgnoreCookiesSpec;
import org.jboss.cache.CacheException;
import org.jboss.cache.Fqn;
import org.jboss.cache.Region;
@@ -46,23 +28,27 @@
import org.jboss.cache.buddyreplication.GravitateResult;
import org.jboss.cache.commands.CommandsFactory;
import org.jboss.cache.commands.ReplicableCommand;
-import org.jboss.cache.commands.WriteCommand;
import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.optimistic.DefaultDataVersion;
import org.jboss.cache.transaction.GlobalTransaction;
-import org.jboss.cache.transaction.TransactionLog.LogEntry;
import org.jboss.cache.util.FastCopyHashMap;
import org.jboss.cache.util.Immutables;
import org.jgroups.Address;
import org.jgroups.stack.IpAddress;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.*;
+
/**
* An enhanced marshaller for RPC calls between CacheImpl instances.
*
* @author <a href="mailto:manik AT jboss DOT org">Manik Surtani (manik
AT jboss DOT org)</a>
*/
-public class CacheMarshaller200 extends AbstractMarshaller
-{
+public class CacheMarshaller200 extends AbstractMarshaller {
// magic numbers
protected static final int MAGICNUMBER_METHODCALL = 1;
protected static final int MAGICNUMBER_FQN = 2;
@@ -93,9 +79,6 @@
protected static final int MAGICNUMBER_FLOAT = 27;
protected static final int MAGICNUMBER_DOUBLE = 28;
protected static final int MAGICNUMBER_OBJECT = 29;
- protected static final int MAGICNUMBER_TXLOG_ENTRY = 50;
- protected static final int MAGICNUMBER_REQUEST_IGNORED_RESPONSE = 51;
- protected static final int MAGICNUMBER_EXTENDED_RESPONSE = 52;
protected static final int MAGICNUMBER_NULL = 99;
protected static final int MAGICNUMBER_SERIALIZABLE = 100;
protected static final int MAGICNUMBER_REF = 101;
@@ -103,8 +86,7 @@
protected static final InactiveRegionException IRE = new
InactiveRegionException("Cannot unmarshall to an inactive region");
- public CacheMarshaller200()
- {
+ public CacheMarshaller200() {
initLogger();
// enabled, since this is always enabled in JBC 2.0.0.
useRefs = true;
@@ -113,44 +95,35 @@
protected CommandsFactory commandsFactory;
@Inject
- public void injectCommandsFactory(CommandsFactory commandsFactory)
- {
+ public void injectCommandsFactory(CommandsFactory commandsFactory) {
this.commandsFactory = commandsFactory;
}
// -------- AbstractMarshaller interface
- public void objectToObjectStream(Object o, ObjectOutputStream out) throws Exception
- {
- if (useRegionBasedMarshalling)
- {
+ public void objectToObjectStream(Object o, ObjectOutputStream out) throws Exception {
+ if (useRegionBasedMarshalling) {
Fqn region = null;
- if (o instanceof RegionalizedReturnValue)
- {
+ if (o instanceof RegionalizedReturnValue) {
RegionalizedReturnValue rrv = (RegionalizedReturnValue) o;
region = rrv.region;
o = rrv.returnValue;
- }
- else if (o instanceof ReplicableCommand)
- {
+ } else if (o instanceof ReplicableCommand) {
ReplicableCommand marshallableCommand = (ReplicableCommand) o;
region = extractFqnRegion(marshallableCommand);
}
if (trace) log.trace("Region based call. Using region " + region);
objectToObjectStream(o, out, region);
- }
- else
- {
+ } else {
// not region based!
objectToObjectStream(o, out, null);
}
}
@Override
- public RegionalizedMethodCall regionalizedMethodCallFromObjectStream(ObjectInputStream
in) throws Exception
- {
+ public RegionalizedMethodCall regionalizedMethodCallFromObjectStream(ObjectInputStream
in) throws Exception {
// parse the stream as per normal.
Object[] retVal = objectFromObjectStreamRegionBased(in);
RegionalizedMethodCall rmc = new RegionalizedMethodCall();
@@ -160,14 +133,10 @@
}
- public Object objectFromObjectStream(ObjectInputStream in) throws Exception
- {
- if (useRegionBasedMarshalling)
- {
+ public Object objectFromObjectStream(ObjectInputStream in) throws Exception {
+ if (useRegionBasedMarshalling) {
return objectFromObjectStreamRegionBased(in)[0];
- }
- else
- {
+ } else {
UnmarshalledReferences refMap = useRefs ? new UnmarshalledReferences() : null;
Object retValue = unmarshallObject(in, defaultClassLoader, refMap, false);
if (trace) log.trace("Unmarshalled object " + retValue);
@@ -175,8 +144,7 @@
}
}
- public void objectToObjectStream(Object o, ObjectOutputStream out, Fqn region) throws
Exception
- {
+ public void objectToObjectStream(Object o, ObjectOutputStream out, Fqn region) throws
Exception {
if (trace) log.trace("Marshalling object " + o);
Map<Object, Integer> refMap = useRefs ? new IdentityHashMap<Object,
Integer>() : null;
ClassLoader toUse = defaultClassLoader;
@@ -184,8 +152,7 @@
ClassLoader old = current.getContextClassLoader();
if (old != null) toUse = old;
- try
- {
+ try {
if (useRegionBasedMarshalling) // got to check again in case this meth is called
directly
{
if (trace) log.trace("Writing region " + region + " to
stream");
@@ -194,20 +161,16 @@
if (r != null && r.getClassLoader() != null) toUse =
r.getClassLoader();
current.setContextClassLoader(toUse);
marshallObject(region, out, refMap);
- }
- else
- {
+ } else {
current.setContextClassLoader(toUse);
}
marshallObject(o, out, refMap);
}
- catch (Exception th)
- {
- if(log.isErrorEnabled()) log.error("Error while marshalling object: "
+ o, th);
+ catch (Exception th) {
+ if (log.isErrorEnabled()) log.error("Error while marshalling object: "
+ o, th);
throw th;
}
- finally
- {
+ finally {
if (log.isTraceEnabled()) log.trace("Done serializing object: " + o);
current.setContextClassLoader(old);
}
@@ -215,21 +178,18 @@
/**
* @param in
- * @return a 2-object array. The first one is the unmarshalled object and the 2nd is
an Fqn that relates to the region used. If region-based marshalling is not used, the 2nd
value is null.
+ * @return a 2-object array. The first one is the unmarshalled object and the 2nd is
an Fqn that relates to the
+ * region used. If region-based marshalling is not used, the 2nd value is
null.
* @throws Exception
*/
- protected Object[] objectFromObjectStreamRegionBased(ObjectInputStream in) throws
Exception
- {
+ protected Object[] objectFromObjectStreamRegionBased(ObjectInputStream in) throws
Exception {
UnmarshalledReferences refMap = useRefs ? new UnmarshalledReferences() : null;
Object o = unmarshallObject(in, refMap);
Fqn regionFqn = null;
- if (o == null)
- {
+ if (o == null) {
// a null region. Could happen. Use std marshalling.
log.trace("Unmarshalled region as null. Not using a context class loader
to unmarshall.");
- }
- else
- {
+ } else {
regionFqn = (Fqn) o;
}
@@ -238,18 +198,14 @@
Region region = null;
Object[] retValue = {null, null};
- if (regionFqn != null)
- {
+ if (regionFqn != null) {
region = findRegion(regionFqn);
}
- if (region == null)
- {
+ if (region == null) {
if (log.isDebugEnabled())
log.debug("Region does not exist for Fqn " + regionFqn + " -
not using a context classloader.");
retValue[0] = unmarshallObject(in, defaultClassLoader, refMap, false);
- }
- else
- {
+ } else {
retValue[0] = unmarshallObject(in, region.getClassLoader(), refMap, true);
retValue[1] = regionFqn;
}
@@ -257,35 +213,24 @@
return retValue;
}
- private Region findRegion(Fqn fqn) throws InactiveRegionException
- {
+ private Region findRegion(Fqn fqn) throws InactiveRegionException {
Region region = regionManager.getValidMarshallingRegion(fqn);
- if (region != null)
- {
+ if (region != null) {
Status status = region.getStatus();
- if (status == Status.INACTIVATING || status == Status.INACTIVE)
- {
- if (log.isDebugEnabled())
- {
+ if (status == Status.INACTIVATING || status == Status.INACTIVE) {
+ if (log.isDebugEnabled()) {
throw new InactiveRegionException("Cannot unmarshall message for
region " + fqn + ". This region is inactive.");
- }
- else
- {
+ } else {
throw IRE;
}
}
- }
- else if (defaultInactive)
- {
- if (log.isDebugEnabled())
- {
+ } else if (defaultInactive) {
+ if (log.isDebugEnabled()) {
// No region but default inactive means region is inactive
throw new InactiveRegionException("Cannot unmarshall message for region
" + fqn + ". By default region " + fqn
+ " is inactive.");
- }
- else
- {
+ } else {
throw IRE;
}
}
@@ -293,8 +238,7 @@
return region;
}
- private Fqn extractFqnRegion(ReplicableCommand cmd) throws Exception
- {
+ private Fqn extractFqnRegion(ReplicableCommand cmd) throws Exception {
Fqn fqn = extractFqn(cmd);
Region r = regionManager.getValidMarshallingRegion(fqn);
@@ -304,272 +248,172 @@
// --------- Marshalling methods
@SuppressWarnings("deprecation")
- protected void marshallObject(Object o, ObjectOutputStream out, Map<Object,
Integer> refMap) throws Exception
- {
- if (o == null)
- {
+ protected void marshallObject(Object o, ObjectOutputStream out, Map<Object,
Integer> refMap) throws Exception {
+ if (o == null) {
out.writeByte(MAGICNUMBER_NULL);
- }
- else if (useRefs && refMap.containsKey(o))// see if this object has been
marshalled before.
+ } else if (useRefs && refMap.containsKey(o))// see if this object has been
marshalled before.
{
out.writeByte(MAGICNUMBER_REF);
writeReference(out, refMap.get(o));
- }
- else if (o instanceof ReplicableCommand)
- {
+ } else if (o instanceof ReplicableCommand) {
ReplicableCommand command = (ReplicableCommand) o;
- if (command.getCommandId() > -1)
- {
+ if (command.getCommandId() > -1) {
out.writeByte(MAGICNUMBER_METHODCALL);
marshallCommand(command, out, refMap);
- }
- else
- {
+ } else {
throw new IllegalArgumentException("MethodCall does not have a valid
method id. Was this method call created with MethodCallFactory?");
}
- }
- else if (o instanceof org.jgroups.blocks.MethodCall)
- {
+ } else if (o instanceof org.jgroups.blocks.MethodCall) {
throw new IllegalArgumentException("Usage of a legacy MethodCall
object!!");
- }
- else if (o instanceof MarshalledValue)
- {
+ } else if (o instanceof MarshalledValue) {
out.writeByte(MAGICNUMBER_MARSHALLEDVALUE);
((MarshalledValue) o).writeExternal(out);
- }
- else if (o instanceof Fqn)
- {
+ } else if (o instanceof Fqn) {
out.writeByte(MAGICNUMBER_FQN);
if (useRefs) writeReference(out, createReference(o, refMap));
marshallFqn((Fqn) o, out, refMap);
- }
- else if (o instanceof GlobalTransaction)
- {
+ } else if (o instanceof GlobalTransaction) {
out.writeByte(MAGICNUMBER_GTX);
if (useRefs) writeReference(out, createReference(o, refMap));
marshallGlobalTransaction((GlobalTransaction) o, out, refMap);
- }
- else if (o instanceof LogEntry)
- {
- out.writeByte(MAGICNUMBER_TXLOG_ENTRY);
- marshallLogEntry((LogEntry)o, out, refMap);
- }
- else if (o instanceof IpAddress)
- {
+ } else if (o instanceof IpAddress) {
out.writeByte(MAGICNUMBER_IPADDRESS);
marshallIpAddress((IpAddress) o, out);
- }
- else if (o instanceof DefaultDataVersion)
- {
+ } else if (o instanceof DefaultDataVersion) {
out.writeByte(MAGICNUMBER_DEFAULT_DATA_VERSION);
marshallDefaultDataVersion((DefaultDataVersion) o, out);
- }
- else if (o.getClass().equals(ArrayList.class))
- {
+ } else if (o.getClass().equals(ArrayList.class)) {
out.writeByte(MAGICNUMBER_ARRAY_LIST);
marshallCollection((Collection) o, out, refMap);
- }
- else if (o.getClass().equals(LinkedList.class))
- {
+ } else if (o.getClass().equals(LinkedList.class)) {
out.writeByte(MAGICNUMBER_LINKED_LIST);
marshallCollection((Collection) o, out, refMap);
- }
- else if (o.getClass().equals(HashMap.class))
- {
+ } else if (o.getClass().equals(HashMap.class)) {
out.writeByte(MAGICNUMBER_HASH_MAP);
marshallMap((Map) o, out, refMap);
- }
- else if (o.getClass().equals(TreeMap.class))
- {
+ } else if (o.getClass().equals(TreeMap.class)) {
out.writeByte(MAGICNUMBER_TREE_MAP);
marshallMap((Map) o, out, refMap);
- }
- else if (o.getClass().equals(FastCopyHashMap.class))
- {
+ } else if (o.getClass().equals(FastCopyHashMap.class)) {
out.writeByte(MAGICNUMBER_FASTCOPY_HASHMAP);
marshallMap((Map) o, out, refMap);
- }
- else if (o instanceof Map && Immutables.isImmutable(o))
- {
+ } else if (o instanceof Map && Immutables.isImmutable(o)) {
out.writeByte(MAGICNUMBER_IMMUTABLE_MAPCOPY);
marshallMap((Map) o, out, refMap);
- }
- else if (o.getClass().equals(HashSet.class))
- {
+ } else if (o.getClass().equals(HashSet.class)) {
out.writeByte(MAGICNUMBER_HASH_SET);
marshallCollection((Collection) o, out, refMap);
- }
- else if (o.getClass().equals(TreeSet.class))
- {
+ } else if (o.getClass().equals(TreeSet.class)) {
out.writeByte(MAGICNUMBER_TREE_SET);
marshallCollection((Collection) o, out, refMap);
- }
- else if (o instanceof Boolean)
- {
+ } else if (o instanceof Boolean) {
out.writeByte(MAGICNUMBER_BOOLEAN);
out.writeBoolean(((Boolean) o).booleanValue());
- }
- else if (o instanceof Integer)
- {
+ } else if (o instanceof Integer) {
out.writeByte(MAGICNUMBER_INTEGER);
out.writeInt(((Integer) o).intValue());
- }
- else if (o instanceof Long)
- {
+ } else if (o instanceof Long) {
out.writeByte(MAGICNUMBER_LONG);
out.writeLong(((Long) o).longValue());
- }
- else if (o instanceof Short)
- {
+ } else if (o instanceof Short) {
out.writeByte(MAGICNUMBER_SHORT);
out.writeShort(((Short) o).shortValue());
- }
- else if (o instanceof String)
- {
+ } else if (o instanceof String) {
out.writeByte(MAGICNUMBER_STRING);
if (useRefs) writeReference(out, createReference(o, refMap));
marshallString((String) o, out);
- }
- else if (o instanceof NodeDataMarker)
- {
+ } else if (o instanceof NodeDataMarker) {
out.writeByte(MAGICNUMBER_NODEDATA_MARKER);
((Externalizable) o).writeExternal(out);
- }
- else if (o instanceof NodeDataExceptionMarker)
- {
+ } else if (o instanceof NodeDataExceptionMarker) {
out.writeByte(MAGICNUMBER_NODEDATA_EXCEPTION_MARKER);
((Externalizable) o).writeExternal(out);
- }
- else if (o instanceof NodeData)
- {
+ } else if (o instanceof NodeData) {
out.writeByte(MAGICNUMBER_NODEDATA);
((Externalizable) o).writeExternal(out);
- }
- else if (o instanceof GravitateResult)
- {
+ } else if (o instanceof GravitateResult) {
out.writeByte(MAGICNUMBER_GRAVITATERESULT);
marshallGravitateResult((GravitateResult) o, out, refMap);
- }
- else if (o instanceof RequestIgnoredResponse)
- {
- out.writeByte(MAGICNUMBER_REQUEST_IGNORED_RESPONSE);
- }
- else if (o instanceof ExtendedResponse)
- {
- out.writeByte(MAGICNUMBER_EXTENDED_RESPONSE);
- marshallExtendedResponse((ExtendedResponse)o, out, refMap);
- }
- else if (o instanceof Serializable)
- {
- if (trace)
- {
+ } else if (o instanceof Serializable) {
+ if (trace) {
log.trace("Not optimum: using object serialization for " +
o.getClass());
}
out.writeByte(MAGICNUMBER_SERIALIZABLE);
if (useRefs) writeReference(out, createReference(o, refMap));
out.writeObject(o);
- }
- else
- {
+ } else {
throw new Exception("Don't know how to marshall object of type " +
o.getClass());
}
}
- private void marshallExtendedResponse(ExtendedResponse response, ObjectOutputStream
out, Map<Object, Integer> refMap) throws Exception
- {
- out.writeBoolean(response.isReplayIgnoredRequests());
- marshallObject(response.getResponse(), out, refMap);
- }
-
- private void marshallLogEntry(LogEntry log, ObjectOutputStream out, Map<Object,
Integer> refMap) throws Exception
- {
- marshallObject(log.getTransaction(), out, refMap);
- marshallObject(log.getModifications(), out, refMap);
- }
-
- private void marshallGravitateResult(GravitateResult gravitateResult,
ObjectOutputStream out, Map<Object, Integer> refMap) throws Exception
- {
+ private void marshallGravitateResult(GravitateResult gravitateResult,
ObjectOutputStream out, Map<Object, Integer> refMap) throws Exception {
marshallObject(gravitateResult.isDataFound(), out, refMap);
- if (gravitateResult.isDataFound())
- {
+ if (gravitateResult.isDataFound()) {
marshallObject(gravitateResult.getNodeData(), out, refMap);
marshallObject(gravitateResult.getBuddyBackupFqn(), out, refMap);
}
}
- private int createReference(Object o, Map<Object, Integer> refMap)
- {
+ private int createReference(Object o, Map<Object, Integer> refMap) {
int reference = refMap.size();
refMap.put(o, reference);
return reference;
}
- protected void marshallString(String s, ObjectOutputStream out) throws Exception
- {
+ protected void marshallString(String s, ObjectOutputStream out) throws Exception {
//StringUtil.saveString(out, s);
out.writeObject(s);
}
- private void marshallCommand(ReplicableCommand command, ObjectOutputStream out,
Map<Object, Integer> refMap) throws Exception
- {
+ private void marshallCommand(ReplicableCommand command, ObjectOutputStream out,
Map<Object, Integer> refMap) throws Exception {
out.writeShort(command.getCommandId());
Object[] args = command.getParameters();
byte numArgs = (byte) (args == null ? 0 : args.length);
out.writeByte(numArgs);
- for (int i = 0; i < numArgs; i++)
- {
+ for (int i = 0; i < numArgs; i++) {
marshallObject(args[i], out, refMap);
}
}
- private void marshallGlobalTransaction(GlobalTransaction globalTransaction,
ObjectOutputStream out, Map<Object, Integer> refMap) throws Exception
- {
+ private void marshallGlobalTransaction(GlobalTransaction globalTransaction,
ObjectOutputStream out, Map<Object, Integer> refMap) throws Exception {
out.writeLong(globalTransaction.getId());
marshallObject(globalTransaction.getAddress(), out, refMap);
}
- protected void marshallFqn(Fqn fqn, ObjectOutputStream out, Map<Object, Integer>
refMap) throws Exception
- {
+ protected void marshallFqn(Fqn fqn, ObjectOutputStream out, Map<Object, Integer>
refMap) throws Exception {
boolean isRoot = fqn.isRoot();
out.writeBoolean(isRoot);
- if (!isRoot)
- {
+ if (!isRoot) {
out.writeShort(fqn.size());
- for (Object o : fqn.peekElements())
- {
+ for (Object o : fqn.peekElements()) {
marshallObject(o, out, refMap);
}
}
}
- private void marshallIpAddress(IpAddress ipAddress, ObjectOutputStream out) throws
Exception
- {
+ private void marshallIpAddress(IpAddress ipAddress, ObjectOutputStream out) throws
Exception {
ipAddress.writeExternal(out);
}
@SuppressWarnings("unchecked")
- private void marshallCollection(Collection c, ObjectOutputStream out, Map refMap)
throws Exception
- {
+ private void marshallCollection(Collection c, ObjectOutputStream out, Map refMap)
throws Exception {
writeUnsignedInt(out, c.size());
- for (Object o : c)
- {
+ for (Object o : c) {
marshallObject(o, out, refMap);
}
}
@SuppressWarnings("unchecked")
- private void marshallMap(Map map, ObjectOutputStream out, Map<Object, Integer>
refMap) throws Exception
- {
+ private void marshallMap(Map map, ObjectOutputStream out, Map<Object, Integer>
refMap) throws Exception {
int mapSize = map.size();
writeUnsignedInt(out, mapSize);
if (mapSize == 0) return;
- for (Map.Entry me : (Set<Map.Entry>) map.entrySet())
- {
+ for (Map.Entry me : (Set<Map.Entry>) map.entrySet()) {
marshallObject(me.getKey(), out, refMap);
marshallObject(me.getValue(), out, refMap);
}
@@ -577,45 +421,35 @@
// --------- Unmarshalling methods
- protected Object unmarshallObject(ObjectInputStream in, ClassLoader loader,
UnmarshalledReferences refMap, boolean overrideContextClassloaderOnThread) throws
Exception
- {
- if (loader == null)
- {
+ protected Object unmarshallObject(ObjectInputStream in, ClassLoader loader,
UnmarshalledReferences refMap, boolean overrideContextClassloaderOnThread) throws
Exception {
+ if (loader == null) {
return unmarshallObject(in, refMap);
- }
- else
- {
+ } else {
Thread currentThread = Thread.currentThread();
ClassLoader old = currentThread.getContextClassLoader();
- try
- {
+ try {
// only do this if we haven't already set a context class loader
elsewhere.
if (overrideContextClassloaderOnThread || old == null)
currentThread.setContextClassLoader(loader);
return unmarshallObject(in, refMap);
}
- finally
- {
+ finally {
if (overrideContextClassloaderOnThread || old == null)
currentThread.setContextClassLoader(old);
}
}
}
- protected Object unmarshallObject(ObjectInputStream in, UnmarshalledReferences refMap)
throws Exception
- {
+ protected Object unmarshallObject(ObjectInputStream in, UnmarshalledReferences refMap)
throws Exception {
byte magicNumber = in.readByte();
int reference = 0;
Object retVal;
- switch (magicNumber)
- {
+ switch (magicNumber) {
case MAGICNUMBER_NULL:
return null;
case MAGICNUMBER_REF:
- if (useRefs)
- {
+ if (useRefs) {
reference = readReference(in);
return refMap.getReferencedObject(reference);
- }
- else break;
+ } else break;
case MAGICNUMBER_SERIALIZABLE:
if (useRefs) reference = readReference(in);
retVal = in.readObject();
@@ -638,8 +472,6 @@
retVal = unmarshallGlobalTransaction(in, refMap);
if (useRefs) refMap.putReferencedObject(reference, retVal);
return retVal;
- case MAGICNUMBER_TXLOG_ENTRY:
- return unmarshallLogEntry(in, refMap);
case MAGICNUMBER_IPADDRESS:
retVal = unmarshallIpAddress(in);
return retVal;
@@ -691,13 +523,8 @@
return retVal;
case MAGICNUMBER_GRAVITATERESULT:
return unmarshallGravitateResult(in, refMap);
- case MAGICNUMBER_REQUEST_IGNORED_RESPONSE:
- return new RequestIgnoredResponse();
- case MAGICNUMBER_EXTENDED_RESPONSE:
- return unmarshallExtendedResponse(in, refMap);
default:
- if (log.isErrorEnabled())
- {
+ if (log.isErrorEnabled()) {
log.error("Unknown Magic Number " + magicNumber);
}
throw new Exception("Unknown magic number " + magicNumber);
@@ -705,60 +532,34 @@
throw new Exception("Unknown magic number " + magicNumber);
}
- private ExtendedResponse unmarshallExtendedResponse(ObjectInputStream in,
UnmarshalledReferences refMap) throws Exception
- {
- boolean replayIgnoredRequests = in.readBoolean();
- ExtendedResponse response = new ExtendedResponse(unmarshallObject(in, refMap));
- response.setReplayIgnoredRequests(replayIgnoredRequests);
-
- return response;
- }
-
- @SuppressWarnings("unchecked")
- private LogEntry unmarshallLogEntry(ObjectInputStream in, UnmarshalledReferences
refMap) throws Exception
- {
- GlobalTransaction gtx = (GlobalTransaction)unmarshallObject(in, refMap);
- List<WriteCommand> mods = (List<WriteCommand>)unmarshallObject(in,
refMap);
-
- return new LogEntry(gtx, mods);
- }
-
- private FastCopyHashMap unmarshallFastCopyHashMap(ObjectInputStream in,
UnmarshalledReferences refMap) throws Exception
- {
+ private FastCopyHashMap unmarshallFastCopyHashMap(ObjectInputStream in,
UnmarshalledReferences refMap) throws Exception {
FastCopyHashMap map = new FastCopyHashMap();
populateFromStream(in, refMap, map);
return map;
}
@SuppressWarnings("unchecked")
- private GravitateResult unmarshallGravitateResult(ObjectInputStream in,
UnmarshalledReferences refMap) throws Exception
- {
+ private GravitateResult unmarshallGravitateResult(ObjectInputStream in,
UnmarshalledReferences refMap) throws Exception {
Boolean found = (Boolean) unmarshallObject(in, refMap);
- if (found)
- {
+ if (found) {
List<NodeData> stuff = (List<NodeData>) unmarshallObject(in,
refMap);
Fqn fqn = (Fqn) unmarshallObject(in, refMap);
return GravitateResult.subtreeResult(stuff, fqn);
- }
- else
- {
+ } else {
return GravitateResult.noDataFound();
}
}
- protected String unmarshallString(ObjectInputStream in) throws Exception
- {
+ protected String unmarshallString(ObjectInputStream in) throws Exception {
return (String) in.readObject();
}
- private ReplicableCommand unmarshallCommand(ObjectInputStream in,
UnmarshalledReferences refMap) throws Exception
- {
+ private ReplicableCommand unmarshallCommand(ObjectInputStream in,
UnmarshalledReferences refMap) throws Exception {
short methodId = in.readShort();
byte numArgs = in.readByte();
Object[] args = null;
- if (numArgs > 0)
- {
+ if (numArgs > 0) {
args = new Object[numArgs];
for (int i = 0; i < numArgs; i++) args[i] = unmarshallObject(in, refMap);
}
@@ -766,8 +567,7 @@
return commandsFactory.fromStream(methodId, args);
}
- private GlobalTransaction unmarshallGlobalTransaction(ObjectInputStream in,
UnmarshalledReferences refMap) throws Exception
- {
+ private GlobalTransaction unmarshallGlobalTransaction(ObjectInputStream in,
UnmarshalledReferences refMap) throws Exception {
GlobalTransaction gtx = new GlobalTransaction();
long id = in.readLong();
Object address = unmarshallObject(in, refMap);
@@ -776,115 +576,97 @@
return gtx;
}
- protected Fqn unmarshallFqn(ObjectInputStream in, UnmarshalledReferences refMap)
throws Exception
- {
+ protected Fqn unmarshallFqn(ObjectInputStream in, UnmarshalledReferences refMap)
throws Exception {
boolean isRoot = in.readBoolean();
Fqn fqn;
- if (!isRoot)
- {
+ if (!isRoot) {
int numElements = in.readShort();
List<Object> elements = new ArrayList<Object>(numElements);
- for (int i = 0; i < numElements; i++)
- {
+ for (int i = 0; i < numElements; i++) {
elements.add(unmarshallObject(in, refMap));
}
fqn = Fqn.fromList(elements, true);
- }
- else
- {
+ } else {
fqn = Fqn.ROOT;
}
return fqn;
}
- private IpAddress unmarshallIpAddress(ObjectInputStream in) throws Exception
- {
+ private IpAddress unmarshallIpAddress(ObjectInputStream in) throws Exception {
IpAddress ipAddress = new IpAddress();
ipAddress.readExternal(in);
return ipAddress;
}
- private List unmarshallArrayList(ObjectInputStream in, UnmarshalledReferences refMap)
throws Exception
- {
+ private List unmarshallArrayList(ObjectInputStream in, UnmarshalledReferences refMap)
throws Exception {
int listSize = readUnsignedInt(in);
List list = new ArrayList(listSize);
populateFromStream(in, refMap, list, listSize);
return list;
}
- private List unmarshallLinkedList(ObjectInputStream in, UnmarshalledReferences refMap)
throws Exception
- {
+ private List unmarshallLinkedList(ObjectInputStream in, UnmarshalledReferences refMap)
throws Exception {
List list = new LinkedList();
populateFromStream(in, refMap, list, readUnsignedInt(in));
return list;
}
- private Map unmarshallHashMap(ObjectInputStream in, UnmarshalledReferences refMap)
throws Exception
- {
+ private Map unmarshallHashMap(ObjectInputStream in, UnmarshalledReferences refMap)
throws Exception {
Map map = new HashMap();
populateFromStream(in, refMap, map);
return map;
}
@SuppressWarnings("unchecked")
- private Map unmarshallMapCopy(ObjectInputStream in, UnmarshalledReferences refMap)
throws Exception
- {
+ private Map unmarshallMapCopy(ObjectInputStream in, UnmarshalledReferences refMap)
throws Exception {
// read in as a HashMap first
Map m = unmarshallHashMap(in, refMap);
return Immutables.immutableMapWrap(m);
}
- private Map unmarshallTreeMap(ObjectInputStream in, UnmarshalledReferences refMap)
throws Exception
- {
+ private Map unmarshallTreeMap(ObjectInputStream in, UnmarshalledReferences refMap)
throws Exception {
Map map = new TreeMap();
populateFromStream(in, refMap, map);
return map;
}
- private Set unmarshallHashSet(ObjectInputStream in, UnmarshalledReferences refMap)
throws Exception
- {
+ private Set unmarshallHashSet(ObjectInputStream in, UnmarshalledReferences refMap)
throws Exception {
Set set = new HashSet();
populateFromStream(in, refMap, set);
return set;
}
- private Set unmarshallTreeSet(ObjectInputStream in, UnmarshalledReferences refMap)
throws Exception
- {
+ private Set unmarshallTreeSet(ObjectInputStream in, UnmarshalledReferences refMap)
throws Exception {
Set set = new TreeSet();
populateFromStream(in, refMap, set);
return set;
}
@SuppressWarnings("unchecked")
- private void populateFromStream(ObjectInputStream in, UnmarshalledReferences refMap,
Map mapToPopulate) throws Exception
- {
+ private void populateFromStream(ObjectInputStream in, UnmarshalledReferences refMap,
Map mapToPopulate) throws Exception {
int size = readUnsignedInt(in);
for (int i = 0; i < size; i++) mapToPopulate.put(unmarshallObject(in, refMap),
unmarshallObject(in, refMap));
}
@SuppressWarnings("unchecked")
- private void populateFromStream(ObjectInputStream in, UnmarshalledReferences refMap,
Set setToPopulate) throws Exception
- {
+ private void populateFromStream(ObjectInputStream in, UnmarshalledReferences refMap,
Set setToPopulate) throws Exception {
int size = readUnsignedInt(in);
for (int i = 0; i < size; i++) setToPopulate.add(unmarshallObject(in, refMap));
}
@SuppressWarnings("unchecked")
- private void populateFromStream(ObjectInputStream in, UnmarshalledReferences refMap,
List listToPopulate, int listSize) throws Exception
- {
+ private void populateFromStream(ObjectInputStream in, UnmarshalledReferences refMap,
List listToPopulate, int listSize) throws Exception {
for (int i = 0; i < listSize; i++) listToPopulate.add(unmarshallObject(in,
refMap));
}
@SuppressWarnings("deprecation")
- protected void marshallDefaultDataVersion(DefaultDataVersion ddv, ObjectOutputStream
out) throws Exception
- {
+ protected void marshallDefaultDataVersion(DefaultDataVersion ddv, ObjectOutputStream
out) throws Exception {
writeUnsignedLong(out, ddv.getRawVersion());
}
@SuppressWarnings("deprecation")
- protected DefaultDataVersion unmarshallDefaultDataVersion(ObjectInputStream in) throws
Exception
- {
+ protected DefaultDataVersion unmarshallDefaultDataVersion(ObjectInputStream in) throws
Exception {
return new DefaultDataVersion(readUnsignedLong(in));
}
@@ -895,8 +677,7 @@
* @return an int representing a reference in RefMap.
* @throws IOException propagated from the OIS
*/
- protected int readReference(ObjectInputStream in) throws IOException
- {
+ protected int readReference(ObjectInputStream in) throws IOException {
return in.readShort();
}
@@ -907,175 +688,134 @@
* @param reference the reference to write
* @throws java.io.IOException propagated from the OOS
*/
- protected void writeReference(ObjectOutputStream out, int reference) throws
IOException
- {
+ protected void writeReference(ObjectOutputStream out, int reference) throws
IOException {
out.writeShort(reference);
}
- protected int readUnsignedInt(ObjectInputStream in) throws IOException
- {
+ protected int readUnsignedInt(ObjectInputStream in) throws IOException {
return in.readInt();
}
- protected void writeUnsignedInt(ObjectOutputStream out, int i) throws IOException
- {
+ protected void writeUnsignedInt(ObjectOutputStream out, int i) throws IOException {
out.writeInt(i);
}
- protected long readUnsignedLong(ObjectInputStream in) throws IOException
- {
+ protected long readUnsignedLong(ObjectInputStream in) throws IOException {
return in.readLong();
}
- protected void writeUnsignedLong(ObjectOutputStream out, long i) throws IOException
- {
+ protected void writeUnsignedLong(ObjectOutputStream out, long i) throws IOException {
out.writeLong(i);
}
- protected Object unmarshallArray(ObjectInputStream in, UnmarshalledReferences refs)
throws Exception
- {
+ protected Object unmarshallArray(ObjectInputStream in, UnmarshalledReferences refs)
throws Exception {
int sz = readUnsignedInt(in);
byte type = in.readByte();
- switch (type)
- {
- case MAGICNUMBER_BOOLEAN:
- {
+ switch (type) {
+ case MAGICNUMBER_BOOLEAN: {
boolean isPrim = in.readBoolean();
- if (isPrim)
- {
+ if (isPrim) {
boolean[] a = new boolean[sz];
for (int i = 0; i < sz; i++) a[i] = in.readBoolean();
return a;
- }
- else
- {
+ } else {
Boolean[] a = new Boolean[sz];
for (int i = 0; i < sz; i++) a[i] = in.readBoolean();
return a;
}
}
- case MAGICNUMBER_INTEGER:
- {
+ case MAGICNUMBER_INTEGER: {
boolean isPrim = in.readBoolean();
- if (isPrim)
- {
+ if (isPrim) {
int[] a = new int[sz];
for (int i = 0; i < sz; i++) a[i] = in.readInt();
return a;
- }
- else
- {
+ } else {
Integer[] a = new Integer[sz];
for (int i = 0; i < sz; i++) a[i] = in.readInt();
return a;
}
}
- case MAGICNUMBER_LONG:
- {
+ case MAGICNUMBER_LONG: {
boolean isPrim = in.readBoolean();
- if (isPrim)
- {
+ if (isPrim) {
long[] a = new long[sz];
for (int i = 0; i < sz; i++) a[i] = in.readLong();
return a;
- }
- else
- {
+ } else {
Long[] a = new Long[sz];
for (int i = 0; i < sz; i++) a[i] = in.readLong();
return a;
}
}
- case MAGICNUMBER_CHAR:
- {
+ case MAGICNUMBER_CHAR: {
boolean isPrim = in.readBoolean();
- if (isPrim)
- {
+ if (isPrim) {
char[] a = new char[sz];
for (int i = 0; i < sz; i++) a[i] = in.readChar();
return a;
- }
- else
- {
+ } else {
Character[] a = new Character[sz];
for (int i = 0; i < sz; i++) a[i] = in.readChar();
return a;
}
}
- case MAGICNUMBER_BYTE:
- {
+ case MAGICNUMBER_BYTE: {
boolean isPrim = in.readBoolean();
- if (isPrim)
- {
+ if (isPrim) {
byte[] a = new byte[sz];
int bsize = 10240;
int offset = 0;
int bytesLeft = sz;
- while (bytesLeft > 0)
- {
+ while (bytesLeft > 0) {
int read = in.read(a, offset, Math.min(bsize, bytesLeft));
offset += read;
bytesLeft -= read;
}
return a;
- }
- else
- {
+ } else {
Byte[] a = new Byte[sz];
for (int i = 0; i < sz; i++) a[i] = in.readByte();
return a;
}
}
- case MAGICNUMBER_SHORT:
- {
+ case MAGICNUMBER_SHORT: {
boolean isPrim = in.readBoolean();
- if (isPrim)
- {
+ if (isPrim) {
short[] a = new short[sz];
for (int i = 0; i < sz; i++) a[i] = in.readShort();
return a;
- }
- else
- {
+ } else {
Short[] a = new Short[sz];
for (int i = 0; i < sz; i++) a[i] = in.readShort();
return a;
}
}
- case MAGICNUMBER_FLOAT:
- {
+ case MAGICNUMBER_FLOAT: {
boolean isPrim = in.readBoolean();
- if (isPrim)
- {
+ if (isPrim) {
float[] a = new float[sz];
for (int i = 0; i < sz; i++) a[i] = in.readFloat();
return a;
- }
- else
- {
+ } else {
Float[] a = new Float[sz];
for (int i = 0; i < sz; i++) a[i] = in.readFloat();
return a;
}
}
- case MAGICNUMBER_DOUBLE:
- {
+ case MAGICNUMBER_DOUBLE: {
boolean isPrim = in.readBoolean();
- if (isPrim)
- {
+ if (isPrim) {
double[] a = new double[sz];
for (int i = 0; i < sz; i++) a[i] = in.readDouble();
return a;
- }
- else
- {
+ } else {
Double[] a = new Double[sz];
for (int i = 0; i < sz; i++) a[i] = in.readDouble();
return a;
}
}
- case MAGICNUMBER_OBJECT:
- {
+ case MAGICNUMBER_OBJECT: {
Object[] a = new Object[sz];
for (int i = 0; i < sz; i++) a[i] = unmarshallObject(in, refs);
return a;
Modified:
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
===================================================================
---
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java 2009-02-13
13:43:13 UTC (rev 7686)
+++
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java 2009-02-13
14:43:12 UTC (rev 7687)
@@ -21,22 +21,7 @@
*/
package org.jboss.cache.marshall;
-import java.io.NotSerializableException;
-import java.util.Map;
-import java.util.Vector;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
import org.jboss.cache.InvocationContext;
-import org.jboss.cache.RPCManager;
-import org.jboss.cache.RPCManagerImpl.FlushTracker;
import org.jboss.cache.commands.ReplicableCommand;
import org.jboss.cache.commands.VisitableCommand;
import org.jboss.cache.commands.remote.AnnounceBuddyPoolNameCommand;
@@ -46,30 +31,36 @@
import org.jboss.cache.factories.ComponentRegistry;
import org.jboss.cache.interceptors.InterceptorChain;
import org.jboss.cache.invocation.InvocationContextContainer;
-import org.jboss.cache.lock.TimeoutException;
import org.jboss.cache.util.concurrent.BoundedExecutors;
-import org.jboss.cache.util.concurrent.ReclosableLatch;
import org.jboss.cache.util.concurrent.WithinThreadExecutor;
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.MembershipListener;
import org.jgroups.Message;
import org.jgroups.MessageListener;
-import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.RpcDispatcher;
import org.jgroups.blocks.RspFilter;
import org.jgroups.util.Buffer;
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
+import java.io.NotSerializableException;
+import java.util.Vector;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
/**
* A JGroups RPC dispatcher that knows how to deal with {@link
org.jboss.cache.commands.ReplicableCommand}s.
*
* @author Manik Surtani (<a href="mailto:manik AT jboss DOT org">manik
AT jboss DOT org</a>)
* @since 2.2.0
*/
-public class CommandAwareRpcDispatcher extends RpcDispatcher
-{
+public class CommandAwareRpcDispatcher extends RpcDispatcher {
protected InvocationContextContainer invocationContextContainer;
protected InterceptorChain interceptorChain;
protected ComponentRegistry componentRegistry;
@@ -77,27 +68,23 @@
private ExecutorService replicationProcessor;
private AtomicInteger replicationProcessorCount;
private boolean asyncSerial;
- private Configuration configuration;
- private RPCManager rpcManager;
private ReplicationObserver replicationObserver;
- public CommandAwareRpcDispatcher() {}
+ public CommandAwareRpcDispatcher() {
+ }
public CommandAwareRpcDispatcher(Channel channel, MessageListener l,
MembershipListener l2,
Object serverObj, InvocationContextContainer
container, InterceptorChain interceptorChain,
- ComponentRegistry componentRegistry, RPCManager
manager)
- {
+ ComponentRegistry componentRegistry) {
super(channel, l, l2, serverObj);
this.invocationContextContainer = container;
this.componentRegistry = componentRegistry;
this.interceptorChain = interceptorChain;
- this.rpcManager = manager;
trace = log.isTraceEnabled();
// what sort of a repl processor do we need?
Configuration c = componentRegistry.getComponent(Configuration.class);
- this.configuration = c;
replicationProcessor = c.getRuntimeConfig().getAsyncSerializationExecutor();
if (c.getCacheMode().isSynchronous() ||
(replicationProcessor == null && c.getSerializationExecutorPoolSize()
< 1) || requireSyncMarshalling(c)) // if an executor has not been injected and the pool
size is set
@@ -105,29 +92,23 @@
// in-process thread. Not async.
replicationProcessor = new WithinThreadExecutor();
asyncSerial = false;
- }
- else
- {
+ } else {
asyncSerial = true;
- if (replicationProcessor == null)
- {
+ if (replicationProcessor == null) {
replicationProcessorCount = new AtomicInteger(0);
replicationProcessor = BoundedExecutors.newFixedThreadPool(c.isUseReplQueue()
? 1 : c.getSerializationExecutorPoolSize(),
- new ThreadFactory()
- {
- public Thread newThread(Runnable r)
- {
- return new Thread(r, "AsyncReplicationProcessor-" +
replicationProcessorCount.incrementAndGet());
- }
- }, c.getSerializationExecutorQueueSize()
+ new
ThreadFactory() {
+ public Thread
newThread(Runnable r) {
+ return new
Thread(r, "AsyncReplicationProcessor-" +
replicationProcessorCount.incrementAndGet());
+ }
+ },
c.getSerializationExecutorQueueSize()
);
}
}
}
- public ReplicationObserver setReplicationObserver(ReplicationObserver
replicationObserver)
- {
+ public ReplicationObserver setReplicationObserver(ReplicationObserver
replicationObserver) {
ReplicationObserver result = this.replicationObserver;
this.replicationObserver = replicationObserver;
return result;
@@ -138,15 +119,12 @@
* which might cause the Commit command to be send before the Prepare command, so
replication will fail. This is not
* the same for async <b>pessimistic/mvcc</b> replication, as this uses a
1PC.
*/
- private boolean requireSyncMarshalling(Configuration c)
- {
+ private boolean requireSyncMarshalling(Configuration c) {
boolean enforceSerialMarshalling =
c.getNodeLockingScheme().equals(Configuration.NodeLockingScheme.OPTIMISTIC)
&& !c.getCacheMode().isInvalidation();
- if (enforceSerialMarshalling)
- {
- if (c.getSerializationExecutorPoolSize() > 1 && log.isWarnEnabled())
- {
+ if (enforceSerialMarshalling) {
+ if (c.getSerializationExecutorPoolSize() > 1 && log.isWarnEnabled())
{
log.warn("Async optimistic caches do not support serialization
pools.");
}
if (trace) log.trace("Disbaling serial marshalling for opt async
cache");
@@ -155,30 +133,24 @@
}
@Override
- public void stop()
- {
+ public void stop() {
replicationProcessor.shutdownNow();
- try
- {
+ try {
replicationProcessor.awaitTermination(60, TimeUnit.SECONDS);
}
- catch (InterruptedException e)
- {
+ catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
super.stop();
}
- protected boolean isValid(Message req)
- {
- if (server_obj == null)
- {
+ protected boolean isValid(Message req) {
+ if (server_obj == null) {
log.error("no method handler is registered. Discarding request.");
return false;
}
- if (req == null || req.getLength() == 0)
- {
+ if (req == null || req.getLength() == 0) {
log.error("message or message buffer is null");
return false;
}
@@ -187,14 +159,13 @@
}
/**
- * Similar to {@link #callRemoteMethods(java.util.Vector,
org.jgroups.blocks.MethodCall, int, long, boolean, boolean, org.jgroups.blocks.RspFilter)}
except that this version
- * is aware of {@link org.jboss.cache.commands.ReplicableCommand} objects.
+ * Similar to {@link #callRemoteMethods(java.util.Vector,
org.jgroups.blocks.MethodCall, int, long, boolean, boolean,
+ * org.jgroups.blocks.RspFilter)} except that this version is aware of {@link
org.jboss.cache.commands.ReplicableCommand}
+ * objects.
*/
public RspList invokeRemoteCommands(Vector<Address> dests, ReplicableCommand
command, int mode, long timeout,
- boolean anycasting, boolean oob, RspFilter filter)
throws NotSerializableException, ExecutionException, InterruptedException
- {
- if (dests != null && dests.isEmpty())
- {
+ boolean anycasting, boolean oob, RspFilter filter)
throws NotSerializableException, ExecutionException, InterruptedException {
+ if (dests != null && dests.isEmpty()) {
// don't send if dest list is empty
if (trace) log.trace("Destination list is empty: no need to send
message");
return new RspList();
@@ -204,16 +175,12 @@
log.trace(new StringBuilder("dests=").append(dests).append(",
command=").append(command).
append(", mode=").append(mode).append(",
timeout=").append(timeout));
- boolean supportReplay = configuration.isNonBlockingStateTransfer();
- ReplicationTask replicationTask = new ReplicationTask(command, oob, dests, mode,
timeout, anycasting, supportReplay, filter);
+ ReplicationTask replicationTask = new ReplicationTask(command, oob, dests, mode,
timeout, anycasting, filter);
Future<RspList> response = replicationProcessor.submit(replicationTask);
- if (asyncSerial)
- {
+ if (asyncSerial) {
// don't care about the response. return.
return null;
- }
- else
- {
+ } else {
RspList retval = response.get();
if (retval.isEmpty() || containsOnlyNulls(retval))
return null;
@@ -222,10 +189,8 @@
}
}
- private boolean containsOnlyNulls(RspList l)
- {
- for (Rsp r : l.values())
- {
+ private boolean containsOnlyNulls(RspList l) {
+ for (Rsp r : l.values()) {
if (r.getValue() != null || !r.wasReceived() || r.wasSuspected()) return false;
}
return true;
@@ -235,139 +200,84 @@
* Message contains a Command. Execute it against *this* object and return result.
*/
@Override
- public Object handle(Message req)
- {
- if (isValid(req))
- {
- try
- {
+ public Object handle(Message req) {
+ if (isValid(req)) {
+ try {
ReplicableCommand command = (ReplicableCommand)
req_marshaller.objectFromByteBuffer(req.getBuffer(), req.getOffset(), req.getLength());
Object execResult = executeCommand(command, req);
if (log.isTraceEnabled()) log.trace("Command : " + command + "
executed, result is: " + execResult);
return execResult;
}
- catch (Throwable x)
- {
+ catch (Throwable x) {
if (trace) log.trace("Problems invoking command.", x);
return x;
}
- }
- else
- {
+ } else {
return null;
}
}
- protected Object executeCommand(ReplicableCommand cmd, Message req) throws Throwable
- {
- boolean unlock = false;
- FlushTracker flushTracker = rpcManager.getFlushTracker();
-
- try
- {
+ protected Object executeCommand(ReplicableCommand cmd, Message req) throws Throwable
{
+ try {
if (cmd == null) throw new NullPointerException("Unable to execute a null
command! Message was " + req);
if (trace) log.trace("Executing command: " + cmd + "
[sender=" + req.getSrc() + "]");
- boolean replayIgnored = false;
-
-
- if (configuration.isNonBlockingStateTransfer())
- {
- int flushCount = flushTracker.getFlushCompletionCount();
- flushTracker.lockProcessingLock();
- unlock = true;
-
-
flushTracker.waitForFlushCompletion(configuration.getStateRetrievalTimeout());
-
- // If this thread blocked during a NBST flush, then inform the sender
- // it needs to replay ignored messages
- replayIgnored = flushTracker.getFlushCompletionCount() != flushCount;
- }
-
- Object ret;
-
- if (cmd instanceof VisitableCommand)
- {
+ if (cmd instanceof VisitableCommand) {
InvocationContext ctx = invocationContextContainer.get();
ctx.setOriginLocal(false);
- if (!componentRegistry.invocationsAllowed(false))
- {
- return new RequestIgnoredResponse();
+ if (!componentRegistry.invocationsAllowed(false)) {
+ return null;
}
- ret = interceptorChain.invoke(ctx, (VisitableCommand) cmd);
- }
- else
- {
+ return interceptorChain.invoke(ctx, (VisitableCommand) cmd);
+ } else {
if (trace) log.trace("This is a non-visitable command - so performing
directly and not via the invoker.");
// need to check cache status for all except buddy replication commands.
if (!(cmd instanceof AnnounceBuddyPoolNameCommand ||
cmd instanceof AssignToBuddyGroupCommand ||
cmd instanceof RemoveFromBuddyGroupCommand)
- && !componentRegistry.invocationsAllowed(false))
- {
- return new RequestIgnoredResponse();
+ && !componentRegistry.invocationsAllowed(false)) {
+ return null;
}
- ret = cmd.perform(null);
+ return cmd.perform(null);
}
-
- if (replayIgnored)
- {
- ExtendedResponse extended = new ExtendedResponse(ret);
- extended.setReplayIgnoredRequests(true);
- ret = extended;
- }
-
- return ret;
}
- finally
- {
+ finally {
if (replicationObserver != null)
replicationObserver.afterExecutingCommand(cmd);
-
- if (unlock)
- flushTracker.unlockProcessingLock();
}
}
@Override
- public String toString()
- {
+ public String toString() {
return getClass().getSimpleName() + "[Outgoing marshaller: " +
req_marshaller + "; incoming marshaller: " + rsp_marshaller + "]";
}
- private class ReplicationTask implements Callable<RspList>
- {
+ private class ReplicationTask implements Callable<RspList> {
private ReplicableCommand command;
private boolean oob;
private Vector<Address> dests;
private int mode;
private long timeout;
private boolean anycasting;
- private boolean supportReplay;
private RspFilter filter;
- private ReplicationTask(ReplicableCommand command, boolean oob,
Vector<Address> dests, int mode, long timeout, boolean anycasting, boolean
supportReplay, RspFilter filter)
- {
+ private ReplicationTask(ReplicableCommand command, boolean oob,
Vector<Address> dests, int mode, long timeout, boolean anycasting, RspFilter filter)
{
this.command = command;
this.oob = oob;
this.dests = dests;
this.mode = mode;
this.timeout = timeout;
this.anycasting = anycasting;
- this.supportReplay = supportReplay;
this.filter = filter;
}
- public RspList call() throws Exception
- {
+ public RspList call() throws Exception {
Buffer buf;
- try
- {
+ try {
buf = req_marshaller.objectToBuffer(command);
}
- catch (Exception e)
- {
+ catch (Exception e) {
if (log.isErrorEnabled()) log.error(e);
throw new RuntimeException("Failure to marshal argument(s)", e);
}
@@ -377,7 +287,6 @@
if (oob) msg.setFlag(Message.OOB);
// Replay capability requires responses from all members!
- int mode = supportReplay ? GroupRequest.GET_ALL : this.mode;
RspList retval = castMessage(dests, msg, mode, timeout, anycasting, filter);
if (trace) log.trace("responses: " + retval);
@@ -388,35 +297,6 @@
if (retval == null)
throw new NotSerializableException("RpcDispatcher returned a null. This
is most often caused by args for " + command.getClass().getSimpleName() + " not
being serializable.");
- if (supportReplay)
- {
- boolean replay = false;
- Vector<Address> ignorers = new Vector<Address>();
- for (Map.Entry<Address, Rsp> entry : retval.entrySet())
- {
- Object value = entry.getValue().getValue();
- if (value instanceof RequestIgnoredResponse)
- {
- ignorers.add(entry.getKey());
- }
- else if (value instanceof ExtendedResponse)
- {
- ExtendedResponse extended = (ExtendedResponse) value;
- replay |= extended.isReplayIgnoredRequests();
- entry.getValue().setValue(extended.getResponse());
- }
- }
-
- if (replay && ignorers.size() > 0)
- {
- if (trace)
- log.trace("Replaying message to ignoring senders: " +
ignorers);
- RspList responses = castMessage(ignorers, msg, GroupRequest.GET_ALL,
timeout, anycasting, filter);
- if (responses != null)
- retval.putAll(responses);
- }
- }
-
return retval;
}
}
Deleted: core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/marshall/ExtendedResponse.java
===================================================================
---
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/marshall/ExtendedResponse.java 2009-02-13
13:43:13 UTC (rev 7686)
+++
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/marshall/ExtendedResponse.java 2009-02-13
14:43:12 UTC (rev 7687)
@@ -1,53 +0,0 @@
-/*
-* JBoss, Home of Professional Open Source
-* Copyright 2005, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
-*/
-package org.jboss.cache.marshall;
-
-/**
- * A response with extended information
- *
- * @author Jason T. Greene
- */
-public class ExtendedResponse
-{
- private boolean replayIgnoredRequests;
- private final Object response;
-
- public ExtendedResponse(Object response)
- {
- this.response = response;
- }
-
- public boolean isReplayIgnoredRequests()
- {
- return replayIgnoredRequests;
- }
-
- public void setReplayIgnoredRequests(boolean replayIgnoredRequests)
- {
- this.replayIgnoredRequests = replayIgnoredRequests;
- }
-
- public Object getResponse()
- {
- return response;
- }
-}
Modified:
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/marshall/InactiveRegionAwareRpcDispatcher.java
===================================================================
---
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/marshall/InactiveRegionAwareRpcDispatcher.java 2009-02-13
13:43:13 UTC (rev 7686)
+++
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/marshall/InactiveRegionAwareRpcDispatcher.java 2009-02-13
14:43:12 UTC (rev 7687)
@@ -21,12 +21,10 @@
*/
package org.jboss.cache.marshall;
-import org.jboss.cache.RPCManager;
import org.jboss.cache.commands.ReplicableCommand;
import org.jboss.cache.factories.ComponentRegistry;
import org.jboss.cache.interceptors.InterceptorChain;
import org.jboss.cache.invocation.InvocationContextContainer;
-import org.jboss.cache.util.concurrent.ReclosableLatch;
import org.jgroups.Channel;
import org.jgroups.MembershipListener;
import org.jgroups.Message;
@@ -34,13 +32,13 @@
import org.jgroups.blocks.RpcDispatcher;
/**
- * Extends {@link org.jgroups.blocks.RpcDispatcher} and adds the possibility that the
marshaller may throw {@link org.jboss.cache.marshall.InactiveRegionException}s.
+ * Extends {@link org.jgroups.blocks.RpcDispatcher} and adds the possibility that the
marshaller may throw {@link
+ * org.jboss.cache.marshall.InactiveRegionException}s.
*
* @author <a href="mailto:manik AT jboss DOT org">Manik
Surtani</a>
* @since 2.0.0
*/
-public class InactiveRegionAwareRpcDispatcher extends CommandAwareRpcDispatcher
-{
+public class InactiveRegionAwareRpcDispatcher extends CommandAwareRpcDispatcher {
org.jboss.cache.marshall.Marshaller requestMarshaller;
/**
@@ -48,41 +46,34 @@
*/
public InactiveRegionAwareRpcDispatcher(Channel channel, MessageListener l,
MembershipListener l2, Object serverObj,
InvocationContextContainer container,
InterceptorChain interceptorChain,
- ComponentRegistry componentRegistry,
RPCManager manager)
- {
- super(channel, l, l2, serverObj, container, interceptorChain, componentRegistry,
manager);
+ ComponentRegistry componentRegistry) {
+ super(channel, l, l2, serverObj, container, interceptorChain, componentRegistry);
}
@Override
- public void setRequestMarshaller(Marshaller m)
- {
+ public void setRequestMarshaller(Marshaller m) {
super.setRequestMarshaller(m);
requestMarshaller = (org.jboss.cache.marshall.Marshaller) m;
}
/**
- * Message contains MethodCall. Execute it against *this* object and return result.
- * Use MethodCall.invoke() to do this. Return result.
+ * Message contains MethodCall. Execute it against *this* object and return result.
Use MethodCall.invoke() to do
+ * this. Return result.
*/
@Override
- public Object handle(Message req)
- {
- if (isValid(req))
- {
+ public Object handle(Message req) {
+ if (isValid(req)) {
RegionalizedMethodCall rmc;
ReplicableCommand command;
- try
- {
+ try {
// we will ALWAYS be using the marshaller to unmarshall requests.
rmc =
requestMarshaller.regionalizedMethodCallFromByteBuffer(req.getBuffer());
command = rmc.command;
}
- catch (Throwable e)
- {
- if (e instanceof InactiveRegionException)
- {
+ catch (Throwable e) {
+ if (e instanceof InactiveRegionException) {
if (trace) log.trace("Exception from marshaller: " +
e.getMessage());
return null;
}
@@ -91,19 +82,15 @@
return e;
}
- try
- {
+ try {
Object retVal = executeCommand(command, req);
return new RegionalizedReturnValue(retVal, rmc);
}
- catch (Throwable x)
- {
+ catch (Throwable x) {
if (trace) log.trace("Problems invoking command", x);
return x;
}
- }
- else
- {
+ } else {
return null;
}
}
Deleted:
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/marshall/RequestIgnoredResponse.java
===================================================================
---
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/marshall/RequestIgnoredResponse.java 2009-02-13
13:43:13 UTC (rev 7686)
+++
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/marshall/RequestIgnoredResponse.java 2009-02-13
14:43:12 UTC (rev 7687)
@@ -1,32 +0,0 @@
-/*
-* JBoss, Home of Professional Open Source
-* Copyright 2005, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
-*/
-package org.jboss.cache.marshall;
-
-/**
- * Indicates that the request was ignored,
- *
- * @author Jason T. Greene
- */
-public class RequestIgnoredResponse
-{
-
-}
Modified:
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java
===================================================================
---
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java 2009-02-13
13:43:13 UTC (rev 7686)
+++
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java 2009-02-13
14:43:12 UTC (rev 7687)
@@ -21,215 +21,107 @@
*/
package org.jboss.cache.statetransfer;
-import java.io.ObjectOutputStream;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.CacheException;
import org.jboss.cache.CacheSPI;
import org.jboss.cache.Fqn;
import org.jboss.cache.InternalNode;
import org.jboss.cache.Node;
-import org.jboss.cache.RPCManager;
import org.jboss.cache.Version;
-import org.jboss.cache.RPCManagerImpl.FlushTracker;
-import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.factories.annotations.Start;
import org.jboss.cache.loader.CacheLoader;
import org.jboss.cache.marshall.NodeData;
import org.jboss.cache.marshall.NodeDataExceptionMarker;
-import org.jboss.cache.transaction.TransactionLog;
-public class DefaultStateTransferGenerator implements StateTransferGenerator
-{
+import java.io.ObjectOutputStream;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+public class DefaultStateTransferGenerator implements StateTransferGenerator {
+
public static final short STATE_TRANSFER_VERSION =
Version.getVersionShort("2.0.0.GA");
private Log log = LogFactory.getLog(getClass().getName());
private CacheSPI cache;
- private RPCManager rpcManager;
private Set<Fqn> internalFqns;
- private boolean nonBlocking;
- private long flushTimeout;
- private int maxNonProgressingLogWrites = 5;
- private TransactionLog txLog;
-
-
-
@Inject
- public void inject(CacheSPI cache, RPCManager rpcManager, Configuration configuration,
TransactionLog txLog)
- {
+ public void inject(CacheSPI cache) {
this.cache = cache;
- this.nonBlocking = true;
-
- this.flushTimeout = configuration.getStateRetrievalTimeout();
- this.nonBlocking = configuration.isNonBlockingStateTransfer();
- this.txLog = txLog;
- this.rpcManager = rpcManager;
}
@Start(priority = 14)
- void start()
- {
+ void start() {
this.internalFqns = cache.getInternalFqns();
}
public void generateState(ObjectOutputStream out, Object rootNode, boolean
generateTransient,
- boolean generatePersistent, boolean suppressErrors) throws
Exception
- {
+ boolean generatePersistent, boolean suppressErrors) throws
Exception {
Fqn fqn = getFqn(rootNode);
- try
- {
+ try {
cache.getMarshaller().objectToObjectStream(STATE_TRANSFER_VERSION, out);
- if (generateTransient)
- {
- if (nonBlocking)
- {
- if (! txLog.activate())
- throw new CacheException("Busy performing state transfer for
someone else");
-
- if (log.isTraceEnabled())
- log.trace("Transaction log activated!");
-
- }
-
+ if (generateTransient) {
//transient + marker
- if (log.isTraceEnabled())
- {
+ if (log.isTraceEnabled()) {
log.trace("writing transient state for " + fqn);
}
marshallTransientState((InternalNode) rootNode, out);
- if (log.isTraceEnabled())
- {
+ if (log.isTraceEnabled()) {
log.trace("transient state succesfully written");
}
//associated + marker
- if (log.isTraceEnabled())
- {
+ if (log.isTraceEnabled()) {
log.trace("writing associated state");
}
delimitStream(out);
- if (log.isTraceEnabled())
- {
+ if (log.isTraceEnabled()) {
log.trace("associated state succesfully written");
}
- }
- else
- {
+ } else {
//we have to write two markers for transient and associated
delimitStream(out);
delimitStream(out);
}
CacheLoader cacheLoader = cache.getCacheLoaderManager() == null ? null :
cache.getCacheLoaderManager().getCacheLoader();
- if (cacheLoader != null && generatePersistent)
- {
+ if (cacheLoader != null && generatePersistent) {
writePersistentData(out, fqn, cacheLoader);
}
delimitStream(out);
-
- if (nonBlocking && generateTransient)
- {
- writeTxLog(out);
- }
-
}
- catch (Exception e)
- {
+ catch (Exception e) {
cache.getMarshaller().objectToObjectStream(new NodeDataExceptionMarker(e,
cache.getLocalAddress()), out);
throw e;
}
- finally
- {
- if (nonBlocking)
- txLog.deactivate();
- }
}
- private void writePersistentData(ObjectOutputStream out, Fqn fqn, CacheLoader
cacheLoader) throws Exception
- {
- if (log.isTraceEnabled())
- {
+ private void writePersistentData(ObjectOutputStream out, Fqn fqn, CacheLoader
cacheLoader) throws Exception {
+ if (log.isTraceEnabled()) {
log.trace("writing persistent state for " + fqn + ",using "
+ cache.getCacheLoaderManager().getCacheLoader().getClass());
}
- if (fqn.isRoot())
- {
+ if (fqn.isRoot()) {
cacheLoader.loadEntireState(out);
- }
- else
- {
+ } else {
cacheLoader.loadState(fqn, out);
}
- if (log.isTraceEnabled())
- {
+ if (log.isTraceEnabled()) {
log.trace("persistent state succesfully written");
}
}
- private void writeTxLog(ObjectOutputStream out) throws Exception
- {
- FlushTracker flushTracker = rpcManager.getFlushTracker();
-
- try
- {
- for (int nonProgress = 0, size = txLog.size(); size > 0;)
- {
- if (log.isTraceEnabled())
- log.trace("Tx Log remaining entries = " + size);
- txLog.writeCommitLog(cache.getMarshaller(), out);
- int newSize = txLog.size();
-
- // If size did not decrease then we did not make progress, and could be
wasting
- // our time. Limit this to the specified max.
- if (newSize >= size && ++nonProgress >=
maxNonProgressingLogWrites)
- break;
-
- size = newSize;
- }
-
- // Wait on incoming and outgoing threads to line-up in front of
- // the flush gate.
- flushTracker.lockSuspendProcessingLock();
-
- // Signal to sender that we need a flush to get a consistent view
- // of the remaining transactions.
- delimitStream(out);
- out.flush();
- flushTracker.waitForFlushStart(flushTimeout);
-
- // Write remaining transactions
- txLog.writeCommitLog(cache.getMarshaller(), out);
- delimitStream(out);
-
- // Write all non-completed prepares
- txLog.writePendingPrepares(cache.getMarshaller(), out);
- delimitStream(out);
- out.flush();
- }
- finally
- {
- flushTracker.unlockSuspendProcessingLock();
- }
- }
-
- private Fqn getFqn(Object o)
- {
+ private Fqn getFqn(Object o) {
if (o instanceof Node) return ((Node) o).getFqn();
if (o instanceof InternalNode) return ((InternalNode) o).getFqn();
throw new IllegalArgumentException();
@@ -241,8 +133,7 @@
* @param out stream
* @throws java.io.IOException if there are errs
*/
- protected void delimitStream(ObjectOutputStream out) throws Exception
- {
+ protected void delimitStream(ObjectOutputStream out) throws Exception {
cache.getMarshaller().objectToObjectStream(DefaultStateTransferManager.STREAMING_DELIMITER_NODE,
out);
}
@@ -252,17 +143,14 @@
* @param out
* @throws Exception
*/
- protected void marshallTransientState(InternalNode node, ObjectOutputStream out)
throws Exception
- {
+ protected void marshallTransientState(InternalNode node, ObjectOutputStream out)
throws Exception {
List<NodeData> nodeData = new LinkedList<NodeData>();
generateNodeDataList(node, nodeData);
cache.getMarshaller().objectToObjectStream(nodeData, out, node.getFqn());
}
- protected void generateNodeDataList(InternalNode<?, ?> node,
List<NodeData> list) throws Exception
- {
- if (internalFqns.contains(node.getFqn()))
- {
+ protected void generateNodeDataList(InternalNode<?, ?> node,
List<NodeData> list) throws Exception {
+ if (internalFqns.contains(node.getFqn())) {
return;
}
@@ -272,12 +160,9 @@
// first handle the current node
attrs = node.getInternalState(false);
- if (attrs.size() == 0)
- {
+ if (attrs.size() == 0) {
nd = new NodeData(node.getFqn());
- }
- else
- {
+ } else {
nd = new NodeData(node.getFqn(), attrs, true);
}
Modified:
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
===================================================================
---
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java 2009-02-13
13:43:13 UTC (rev 7686)
+++
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java 2009-02-13
14:43:12 UTC (rev 7687)
@@ -21,16 +21,6 @@
*/
package org.jboss.cache.statetransfer;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.CacheException;
@@ -40,30 +30,28 @@
import org.jboss.cache.InvocationContext;
import org.jboss.cache.Node;
import org.jboss.cache.NodeSPI;
-import org.jboss.cache.RPCManager;
import org.jboss.cache.buddyreplication.BuddyManager;
-import org.jboss.cache.commands.WriteCommand;
-import org.jboss.cache.commands.tx.PrepareCommand;
import org.jboss.cache.config.Configuration;
-import org.jboss.cache.factories.ComponentRegistry;
import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.factories.annotations.Start;
-import org.jboss.cache.interceptors.InterceptorChain;
-import org.jboss.cache.invocation.InvocationContextContainer;
import org.jboss.cache.loader.CacheLoader;
import org.jboss.cache.loader.CacheLoaderManager;
import org.jboss.cache.marshall.NodeData;
import org.jboss.cache.marshall.NodeDataExceptionMarker;
import org.jboss.cache.marshall.NodeDataMarker;
import org.jboss.cache.notifications.event.NodeModifiedEvent;
-import org.jboss.cache.transaction.TransactionLog;
-import org.jboss.cache.transaction.TransactionLog.LogEntry;
-import org.jgroups.Address;
-import org.jgroups.Channel;
-public class DefaultStateTransferIntegrator implements StateTransferIntegrator
-{
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+public class DefaultStateTransferIntegrator implements StateTransferIntegrator {
+
private static final Log log =
LogFactory.getLog(DefaultStateTransferIntegrator.class);
private static final boolean trace = log.isTraceEnabled();
@@ -71,151 +59,49 @@
private Set<Fqn> internalFqns;
private Configuration cfg;
- private RPCManager manager;
- private TransactionLog txLog;
private boolean needToPersistState; // for JBCACHE-131
- private boolean nonBlocking;
- private InvocationContextContainer container;
- private InterceptorChain chain;
- private ComponentRegistry registry;
@Inject
- public void inject(CacheSPI<?, ?> cache, Configuration cfg, RPCManager
rpcManager, TransactionLog txLog, InvocationContextContainer container, InterceptorChain
chain, ComponentRegistry registry)
- {
+ public void inject(CacheSPI<?, ?> cache, Configuration cfg) {
this.cache = cache;
this.cfg = cfg;
- this.manager = rpcManager;
- this.nonBlocking = cfg.isNonBlockingStateTransfer();
- this.txLog = txLog;
- this.container = container;
- this.chain = chain;
- this.registry = registry;
}
@Start(priority = 14)
- public void start()
- {
+ public void start() {
this.internalFqns = cache.getInternalFqns();
needToPersistState = cfg.getCacheLoaderConfig() != null &&
!cfg.getCacheLoaderConfig().isFetchPersistentState() &&
!cfg.getCacheLoaderConfig().isShared();
}
- public void integrateState(ObjectInputStream ois, Object target, Fqn targetRoot,
boolean integratePersistentState) throws Exception
- {
+ public void integrateState(ObjectInputStream ois, Object target, Fqn targetRoot,
boolean integratePersistentState) throws Exception {
// pop version from the stream first!
short version = (Short) cache.getMarshaller().objectFromObjectStream(ois);
log.info("Using version " + version);
integrateTransientState(ois, (InternalNode) target);
if (trace) log.trace("Reading marker for nonexistent associated state");
cache.getMarshaller().objectFromObjectStream(ois);
- if (integratePersistentState)
- {
+ if (integratePersistentState) {
integratePersistentState(ois, targetRoot);
}
// Delimiter
verifyMarker(cache.getMarshaller().objectFromObjectStream(ois));
-
- if (nonBlocking)
- integrateTxLog(ois);
}
- private void integrateTxLog(ObjectInputStream ois) throws Exception
- {
- if (trace)
- log.trace("Integrating transaction log");
-
- processCommitLog(ois);
-
- Channel channel = manager.getChannel();
-
- List<Address> targets = new ArrayList<Address>(2);
- targets.add(channel.getLocalAddress());
- targets.add(manager.getLastStateTransferSource());
-
- if (trace)
- log.trace("Flushing targets: " + targets);
-
- if (!channel.startFlush(targets, false))
- throw new CacheException("Could not flush channel! State-transfer
failed!");
-
- try
- {
- if (trace)
- log.trace("Retrieving/Applying post-flush commits");
- processCommitLog(ois);
-
- if (trace)
- log.trace("Retrieving/Applying pending prepares");
- Object object = cache.getMarshaller().objectFromObjectStream(ois);
- while (object instanceof PrepareCommand)
- {
- PrepareCommand command = (PrepareCommand)object;
- if (! txLog.hasPendingPrepare(command))
- {
- InvocationContext ctx = container.get();
- ctx.setOriginLocal(false);
- ctx.getOptionOverrides().setCacheModeLocal(true);
- ctx.getOptionOverrides().setSkipCacheStatusCheck(true);
- chain.invoke(ctx, command);
- }
- object = cache.getMarshaller().objectFromObjectStream(ois);
- }
- verifyMarker(object);
-
- // Block all remote commands once transfer is complete,
- // and before FLUSH completes
- registry.setBlockInStarting(true);
- }
- finally
- {
- if (trace)
- log.trace("Stopping flush");
- channel.stopFlush(targets);
- }
- }
-
- private void processCommitLog(ObjectInputStream ois) throws Exception
- {
- Object object = cache.getMarshaller().objectFromObjectStream(ois);
- while (object instanceof LogEntry)
- {
- List<WriteCommand> mods = ((LogEntry)object).getModifications();
- log.trace("Mods = " + mods);
- for (WriteCommand mod : mods)
- {
- InvocationContext ctx = container.get();
- ctx.setOriginLocal(false);
- ctx.getOptionOverrides().setCacheModeLocal(true);
- ctx.getOptionOverrides().setSkipCacheStatusCheck(true);
- chain.invoke(ctx, mod);
- }
-
- object = cache.getMarshaller().objectFromObjectStream(ois);
- }
- verifyMarker(object);
- }
-
- private void verifyMarker(Object object)
- {
- if (object instanceof NodeDataExceptionMarker)
- {
- NodeDataExceptionMarker e = (NodeDataExceptionMarker)object;
+ private void verifyMarker(Object object) {
+ if (object instanceof NodeDataExceptionMarker) {
+ NodeDataExceptionMarker e = (NodeDataExceptionMarker) object;
throw new CacheException("Error in state transfer stream",
e.getCause());
- }
- else if (! (object instanceof NodeDataMarker))
- {
+ } else if (!(object instanceof NodeDataMarker)) {
throw new CacheException("Invalid object unmarshalled");
}
}
- protected void integrateTransientState(ObjectInputStream in, InternalNode target)
throws Exception
- {
+ protected void integrateTransientState(ObjectInputStream in, InternalNode target)
throws Exception {
boolean transientSet = false;
- try
- {
- if (trace)
- {
+ try {
+ if (trace) {
log.trace("integrating transient state for " + target);
}
@@ -223,80 +109,59 @@
transientSet = true;
- if (trace)
- {
+ if (trace) {
log.trace("transient state successfully integrated");
}
notifyAllNodesCreated(cache.getInvocationContext(), target);
}
- catch (CacheException ce)
- {
+ catch (CacheException ce) {
throw ce;
}
- catch (Exception e)
- {
+ catch (Exception e) {
throw new CacheException(e);
}
- finally
- {
- if (!transientSet)
- {
+ finally {
+ if (!transientSet) {
target.clear();
target.removeChildren();
}
}
}
- protected void integratePersistentState(ObjectInputStream in, Fqn targetFqn) throws
Exception
- {
+ protected void integratePersistentState(ObjectInputStream in, Fqn targetFqn) throws
Exception {
CacheLoaderManager loaderManager = cache.getCacheLoaderManager();
CacheLoader loader = loaderManager == null ? null :
loaderManager.getCacheLoader();
- if (loader == null)
- {
- if (trace)
- {
+ if (loader == null) {
+ if (trace) {
log.trace("cache loader is null, will not attempt to integrate
persistent state");
}
- }
- else
- {
+ } else {
- if (trace)
- {
+ if (trace) {
log.trace("integrating persistent state using " +
loader.getClass().getName());
}
boolean persistentSet = false;
- try
- {
- if (targetFqn.isRoot())
- {
+ try {
+ if (targetFqn.isRoot()) {
loader.storeEntireState(in);
- }
- else
- {
+ } else {
loader.storeState(targetFqn, in);
}
persistentSet = true;
}
- catch (ClassCastException cce)
- {
+ catch (ClassCastException cce) {
log.error("Failed integrating persistent state. One of cacheloaders is
not"
+ " adhering to state stream format. See JBCACHE-738.");
throw cce;
}
- finally
- {
- if (!persistentSet)
- {
+ finally {
+ if (!persistentSet) {
log.warn("persistent state integration failed, removing all nodes
from loader");
loader.remove(targetFqn);
- }
- else
- {
- if (trace)
- {
+ } else {
+ if (trace) {
log.trace("persistent state integrated successfully");
}
}
@@ -305,18 +170,16 @@
}
/**
- * Generates NodeAdded notifications for all nodes of the tree. This is
- * called whenever the tree is initially retrieved (state transfer)
+ * Generates NodeAdded notifications for all nodes of the tree. This is called
whenever the tree is initially
+ * retrieved (state transfer)
*/
- private void notifyAllNodesCreated(InvocationContext ctx, InternalNode curr)
- {
+ private void notifyAllNodesCreated(InvocationContext ctx, InternalNode curr) {
if (curr == null) return;
ctx.setOriginLocal(false);
cache.getNotifier().notifyNodeCreated(curr.getFqn(), true, ctx);
cache.getNotifier().notifyNodeCreated(curr.getFqn(), false, ctx);
// AND notify that they have been modified!!
- if (!curr.getKeys().isEmpty())
- {
+ if (!curr.getKeys().isEmpty()) {
cache.getNotifier().notifyNodeModified(curr.getFqn(), true,
NodeModifiedEvent.ModificationType.PUT_MAP, Collections.emptyMap(), ctx);
cache.getNotifier().notifyNodeModified(curr.getFqn(), false,
NodeModifiedEvent.ModificationType.PUT_MAP, curr.getData(), ctx);
}
@@ -326,19 +189,16 @@
for (InternalNode n : children) notifyAllNodesCreated(ctx, n);
}
- private void prepareContextOptions()
- {
+ private void prepareContextOptions() {
cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
cache.getInvocationContext().getOptionOverrides().setSkipCacheStatusCheck(true);
cache.getInvocationContext().getOptionOverrides().setSuppressPersistence(!needToPersistState);
}
- private void integrateTransientState(Fqn target, ObjectInputStream in) throws
Exception
- {
+ private void integrateTransientState(Fqn target, ObjectInputStream in) throws
Exception {
prepareContextOptions();
NodeSPI targetNode = cache.getNode(target);
- for (Object childname : targetNode.getChildrenNames())
- {
+ for (Object childname : targetNode.getChildrenNames()) {
prepareContextOptions();
targetNode.removeChild(childname);
}
@@ -348,19 +208,16 @@
targetNode.setChildrenLoaded(false);
List<NodeData> list = readNodesAsList(in);
- if (list != null)
- {
+ if (list != null) {
// if the list was null we read an EOF marker!! So don't bother popping it
off the stack later.
Iterator<NodeData> nodeDataIterator = list.iterator();
// Read the first NodeData and integrate into our target
- if (nodeDataIterator.hasNext())
- {
+ if (nodeDataIterator.hasNext()) {
NodeData nd = nodeDataIterator.next();
//are there any transient nodes at all?
- if (nd != null && !nd.isMarker())
- {
+ if (nd != null && !nd.isMarker()) {
// with MVCC these calls should ALWAYS go up the interceptor chain since
no other locking
// takes place elsewhere.
prepareContextOptions();
@@ -388,12 +245,10 @@
}
@SuppressWarnings("unchecked")
- private List<NodeData> readNodesAsList(ObjectInputStream in) throws Exception
- {
+ private List<NodeData> readNodesAsList(ObjectInputStream in) throws Exception {
Object obj = cache.getMarshaller().objectFromObjectStream(in);
- if (obj instanceof NodeDataExceptionMarker)
- {
- Throwable cause = ((NodeDataExceptionMarker)obj).getCause();
+ if (obj instanceof NodeDataExceptionMarker) {
+ Throwable cause = ((NodeDataExceptionMarker) obj).getCause();
if (cause instanceof Exception)
throw (Exception) cause;
@@ -405,29 +260,23 @@
}
private NodeData integrateStateTransferChildren(Fqn parentFqn, int offset,
Iterator<NodeData> nodeDataIterator)
- throws IOException, ClassNotFoundException
- {
+ throws IOException, ClassNotFoundException {
int parentLevel = parentFqn.size();
int targetLevel = parentLevel + 1;
Fqn fqn;
int size;
NodeData nd = nodeDataIterator.hasNext() ? nodeDataIterator.next() : null;
- while (nd != null && !nd.isMarker())
- {
+ while (nd != null && !nd.isMarker()) {
fqn = nd.getFqn();
// If we need to integrate into the buddy backup subtree,
// change the Fqn to fit under it
- if (offset > 0)
- {
+ if (offset > 0) {
fqn = Fqn.fromRelativeFqn(parentFqn.getAncestor(offset), fqn);
}
size = fqn.size();
- if (size <= parentLevel)
- {
+ if (size <= parentLevel) {
return nd;
- }
- else if (size > targetLevel)
- {
+ } else if (size > targetLevel) {
throw new IllegalStateException("NodeData " + fqn + " is not a
direct child of " + parentFqn);
}
@@ -444,8 +293,7 @@
// and return the next NodeData that's a child of our parent
nd = integrateStateTransferChildren(fqn, offset, nodeDataIterator);
}
- if (nd != null && nd.isExceptionMarker())
- {
+ if (nd != null && nd.isExceptionMarker()) {
NodeDataExceptionMarker ndem = (NodeDataExceptionMarker) nd;
throw new CacheException("State provider node " +
ndem.getCacheNodeIdentity()
+ " threw exception during loadState", ndem.getCause());
@@ -453,17 +301,13 @@
return null;
}
- private Set<Fqn> retainInternalNodes(Fqn target)
- {
+ private Set<Fqn> retainInternalNodes(Fqn target) {
Set<Fqn> result = new HashSet<Fqn>();
- for (Fqn internalFqn : internalFqns)
- {
- if (internalFqn.isChildOf(target))
- {
+ for (Fqn internalFqn : internalFqns) {
+ if (internalFqn.isChildOf(target)) {
prepareContextOptions();
Node node = getInternalNode(cache.getNode(target), internalFqn);
- if (node != null)
- {
+ if (node != null) {
result.add(node.getFqn());
}
}
@@ -472,14 +316,12 @@
return result;
}
- private Node getInternalNode(Node parentNode, Fqn internalFqn)
- {
+ private Node getInternalNode(Node parentNode, Fqn internalFqn) {
Fqn parentFqn = parentNode.getFqn();
Object name = internalFqn.get(parentFqn.size());
prepareContextOptions();
Node result = parentNode.getChild(name);
- if (result != null && internalFqn.size() < result.getFqn().size())
- {
+ if (result != null && internalFqn.size() < result.getFqn().size()) {
// need to recursively walk down the tree
result = getInternalNode(result, internalFqn);
}
@@ -487,13 +329,10 @@
return result;
}
- private void integrateRetainedNodes(Fqn target)
- {
+ private void integrateRetainedNodes(Fqn target) {
Set<Fqn> retainedNodes = retainInternalNodes(target);
- for (Fqn retained : retainedNodes)
- {
- if (retained.isChildOf(target))
- {
+ for (Fqn retained : retainedNodes) {
+ if (retained.isChildOf(target)) {
integrateRetainedNode(target, retained);
}
}
@@ -501,30 +340,22 @@
// TODO: What is this rubbish?!??
- private void integrateRetainedNode(Fqn ancFqn, Fqn descFqn)
- {
+ private void integrateRetainedNode(Fqn ancFqn, Fqn descFqn) {
prepareContextOptions();
InternalNode ancestor = cache.getNode(ancFqn).getDelegationTarget();
Object name = descFqn.get(ancFqn.size());
InternalNode child = ancestor.getChild(name);
- if (ancFqn.size() == descFqn.size() + 1)
- {
- if (child == null)
- {
+ if (ancFqn.size() == descFqn.size() + 1) {
+ if (child == null) {
prepareContextOptions();
InternalNode descendant = cache.getNode(descFqn).getDelegationTarget();
prepareContextOptions();
ancestor.addChild(name, descendant);
- }
- else
- {
+ } else {
log.warn("Received unexpected internal node " + descFqn + " in
transferred state");
}
- }
- else
- {
- if (child == null)
- {
+ } else {
+ if (child == null) {
// Missing level -- have to create empty node
// This shouldn't really happen -- internal fqns should
// be immediately under the root
Deleted:
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/transaction/TransactionLog.java
===================================================================
---
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/transaction/TransactionLog.java 2009-02-13
13:43:13 UTC (rev 7686)
+++
core/tags/3.0.3.CR1/src/main/java/org/jboss/cache/transaction/TransactionLog.java 2009-02-13
14:43:12 UTC (rev 7687)
@@ -1,181 +0,0 @@
-/*
-* JBoss, Home of Professional Open Source
-* Copyright 2005, JBoss Inc., and individual contributors as indicated
-* by the @authors tag. See the copyright.txt in the distribution for a
-* full listing of individual contributors.
-*
-* This is free software; you can redistribute it and/or modify it
-* under the terms of the GNU Lesser General Public License as
-* published by the Free Software Foundation; either version 2.1 of
-* the License, or (at your option) any later version.
-*
-* This software is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
-* Lesser General Public License for more details.
-*
-* You should have received a copy of the GNU Lesser General Public
-* License along with this software; if not, write to the Free
-* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
-* 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
-*/
-package org.jboss.cache.transaction;
-
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.commands.WriteCommand;
-import org.jboss.cache.commands.tx.PrepareCommand;
-import org.jboss.cache.marshall.Marshaller;
-
-/**
- * Logs transactions and writes for Non-Blocking State Transfer
- *
- * @author Jason T. Greene
- */
-public class TransactionLog
-{
- private final Map<GlobalTransaction, PrepareCommand> pendingPrepares = new
ConcurrentHashMap<GlobalTransaction, PrepareCommand>();
- private final BlockingQueue<LogEntry> entries = new
LinkedBlockingQueue<LogEntry>();
- private AtomicBoolean active = new AtomicBoolean();
-
- public static class LogEntry
- {
- private final GlobalTransaction transaction;
- private final List<WriteCommand> modifications;
-
- public LogEntry(GlobalTransaction transaction, List<WriteCommand>
modifications)
- {
- this.transaction = transaction;
- this.modifications = modifications;
- }
-
- public GlobalTransaction getTransaction()
- {
- return transaction;
- }
-
- public List<WriteCommand> getModifications()
- {
- return modifications;
- }
- }
-
- private static Log log = LogFactory.getLog(TransactionLog.class);
-
- public void logPrepare(PrepareCommand command)
- {
- pendingPrepares.put(command.getGlobalTransaction(), command);
- }
-
- public void logCommit(GlobalTransaction gtx)
- {
- PrepareCommand command = pendingPrepares.remove(gtx);
- if (command == null)
- {
- log.error("Could not find matching prepare for commit: " + gtx);
- return;
- }
-
- addEntry(new LogEntry(gtx, command.getModifications()));
- }
-
- private void addEntry(LogEntry entry)
- {
- if (! isActive())
- return;
-
- for (;;)
- {
- try
- {
- if (log.isTraceEnabled())
- log.trace("Added commit entry to tx log" + entry);
-
- entries.put(entry);
- break;
- }
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- public void logOnePhaseCommit(GlobalTransaction gtx, List<WriteCommand>
modifications)
- {
- // Just in case...
- if (gtx != null) pendingPrepares.remove(gtx);
- addEntry(new LogEntry(gtx, modifications));
- }
-
- public void logNoTxWrite(WriteCommand write)
- {
- if (! isActive())
- return;
-
- ArrayList<WriteCommand> list = new ArrayList<WriteCommand>();
- list.add(write);
- addEntry(new LogEntry(null, list));
- }
-
- public void rollback(GlobalTransaction gtx)
- {
- pendingPrepares.remove(gtx);
- }
-
- public boolean isActive()
- {
- return active.get();
- }
-
- public boolean activate()
- {
- return active.compareAndSet(false, true);
- }
-
- public void deactivate()
- {
- active.set(false);
- if (entries.size() > 0)
- log.error("Unprocessed Transaction Log Entries! = " +
entries.size());
- entries.clear();
- }
-
- public int size()
- {
- return entries.size();
- }
-
- public void writeCommitLog(Marshaller marshaller, ObjectOutputStream out) throws
Exception
- {
- List<LogEntry> buffer = new ArrayList<LogEntry>(10);
-
- while (entries.drainTo(buffer, 10) > 0)
- {
- for (LogEntry entry : buffer)
- marshaller.objectToObjectStream(entry, out);
-
- buffer.clear();
- }
- }
-
- public void writePendingPrepares(Marshaller marshaller, ObjectOutputStream out) throws
Exception
- {
- for (PrepareCommand entry : pendingPrepares.values())
- marshaller.objectToObjectStream(entry, out);
- }
-
- public boolean hasPendingPrepare(PrepareCommand command)
- {
- return pendingPrepares.containsKey(command.getGlobalTransaction());
- }
-}
Deleted:
core/tags/3.0.3.CR1/src/test/java/org/jboss/cache/statetransfer/NonBlockingStateTransferTest.java
===================================================================
---
core/tags/3.0.3.CR1/src/test/java/org/jboss/cache/statetransfer/NonBlockingStateTransferTest.java 2009-02-13
13:43:13 UTC (rev 7686)
+++
core/tags/3.0.3.CR1/src/test/java/org/jboss/cache/statetransfer/NonBlockingStateTransferTest.java 2009-02-13
14:43:12 UTC (rev 7687)
@@ -1,347 +0,0 @@
-/*
- * JBoss, the OpenSource J2EE webOS
- *
- * Distributable under LGPL license.
- * See terms of license at
gnu.org.
- */
-
-package org.jboss.cache.statetransfer;
-
-import static org.testng.AssertJUnit.assertEquals;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.Cache;
-import org.jboss.cache.CacheSPI;
-import org.jboss.cache.Fqn;
-import org.jboss.cache.UnitTestCacheFactory;
-import org.jboss.cache.config.Configuration;
-import org.jboss.cache.config.Configuration.CacheMode;
-import org.jboss.cache.factories.UnitTestConfigurationFactory;
-import org.jboss.cache.util.TestingUtil;
-import org.testng.annotations.Test;
-
-@Test(groups="functional", testName =
"statetransfer.NonBlockingStateTransferTest")
-public class NonBlockingStateTransferTest
-{
- public static final Fqn A = Fqn.fromString("/a");
- public static final Fqn B = Fqn.fromString("/b");
- public static final Fqn C = Fqn.fromString("/c");
- protected static final String ADDRESS_CLASSNAME =
"org.jboss.cache.marshall.data.Address";
- protected static final String PERSON_CLASSNAME =
"org.jboss.cache.marshall.data.Person";
- public static final Fqn A_B = Fqn.fromString("/a/b");
- public static final Fqn A_C = Fqn.fromString("/a/c");
- public static final Fqn A_D = Fqn.fromString("/a/d");
- public static final String JOE = "JOE";
- public static final String BOB = "BOB";
- public static final String JANE = "JANE";
- public static final Integer TWENTY = 20;
- public static final Integer FORTY = 40;
-
- private volatile int testCount = 0;
-
- private static final Log log = LogFactory.getLog(NonBlockingStateTransferTest.class);
-
- public static class DelayTransfer implements Serializable
- {
- private transient int count;
-
- private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException
- {
- in.defaultReadObject();
- }
-
- private void writeObject(ObjectOutputStream out) throws IOException
- {
- out.defaultWriteObject();
-
- // RPC is first serialization, ST is second
- if (count++ == 0)
- return;
-
- try
- {
- // This sleep is not required for the test to function,
- // however it improves the possibility of finding errors
- // (since it keeps the tx log going)
- Thread.sleep(2000);
- }
- catch (InterruptedException e)
- {
- }
- }
-
- }
- private static class WritingRunner implements Runnable
- {
- private final Cache<Object,Object> cache;
- private final boolean tx;
- private volatile boolean stop;
- private volatile int result;
-
- WritingRunner(Cache<Object, Object> cache, boolean tx)
- {
- this.cache = cache;
- this.tx = tx;
- }
-
- public int result()
- {
- return result;
- }
-
- public void run()
- {
- int c = 0;
- while (!stop)
- {
- try
- {
- if (tx)
-
cache.getConfiguration().getRuntimeConfig().getTransactionManager().begin();
- cache.put("/test" + c, "test", c++);
- if (tx)
-
cache.getConfiguration().getRuntimeConfig().getTransactionManager().commit();
- }
- catch (Exception e)
- {
- e.printStackTrace();
- log.error(e);
- }
- }
- result = c;
- }
-
- public void stop()
- {
- stop = true;
- }
- }
-
- private CacheSPI<Object, Object> createCache(String name)
- {
- return createCache(name, true);
-
- }
-
- private CacheSPI<Object, Object> createCache(String name, boolean start)
- {
- Configuration config =
UnitTestConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC);
- config.setSyncCommitPhase(true);
- config.setClusterName(name + "-" + Thread.currentThread().getName());
- config.setNonBlockingStateTransfer(true);
- config.setSyncReplTimeout(30000);
- CacheSPI<Object, Object> cache = (CacheSPI<Object, Object>) new
UnitTestCacheFactory<Object, Object>().createCache(config, false, getClass());
-
- cache.create();
- if (start)
- cache.start();
- return cache;
- }
-
- public void testInitialStateTransfer() throws Exception
- {
- testCount++;
- log.info("testInitialStateTransfer start - " + testCount);
- CacheSPI<Object, Object> cache1 = null, cache2 = null;
- try
- {
- cache1 = createCache("nbst");
- writeInitialData(cache1);
-
- cache2 = createCache("nbst");
-
- // Pause to give caches time to see each other
- TestingUtil.blockUntilViewsReceived(new CacheSPI[] { cache1, cache2 }, 60000);
-
- verifyInitialData(cache2);
- }
- finally
- {
- TestingUtil.killCaches(cache1, cache2);
- }
- log.info("testInitialStateTransfer end - " + testCount);
- }
-
- public void testConcurrentStateTransfer() throws Exception
- {
- testCount++;
- log.info("testConcurrentStateTransfer start - " + testCount);
- CacheSPI<Object, Object> cache1 = null, cache2 = null, cache3 = null, cache4
= null;
- try
- {
- cache1 = createCache("nbst");
- writeInitialData(cache1);
-
- cache2 = createCache("nbst");
-
- cache1.put("/delay", "delay", new DelayTransfer());
-
- // Pause to give caches time to see each other
- TestingUtil.blockUntilViewsReceived(new CacheSPI[] { cache1, cache2 }, 60000);
- verifyInitialData(cache2);
-
- final CacheSPI<Object, Object >c3 = cache3 = createCache("nbst",
false);
- final CacheSPI<Object, Object >c4 = cache4 = createCache("nbst",
false);
-
- Thread t1 = new Thread(new Runnable()
- {
- public void run()
- {
- c3.start();
- }
- });
- t1.start();
-
- Thread t2 = new Thread(new Runnable()
- {
- public void run()
- {
- c4.start();
- }
- });
- t2.start();
-
- t1.join();
- t2.join();
-
- TestingUtil.blockUntilViewsReceived(new CacheSPI[] { cache1, cache2, cache3,
cache4 }, 60000);
- verifyInitialData(cache3);
- verifyInitialData(cache4);
- }
- finally
- {
- TestingUtil.killCaches(cache1, cache2, cache3, cache4);
- }
- log.info("testConcurrentStateTransfer end - " + testCount);
- }
-
- public void testSTWithThirdWritingNonTxCache() throws Exception
- {
- testCount++;
- log.info("testSTWithThirdWritingNonTxCache start - " + testCount);
- thirdWritingCacheTest(false, "nbst1");
- log.info("testSTWithThirdWritingNonTxCache end - " + testCount);
- }
-
- public void testSTWithThirdWritingTxCache() throws Exception
- {
- testCount++;
- log.info("testSTWithThirdWritingTxCache start - " + testCount);
- thirdWritingCacheTest(true, "nbst2");
- log.info("testSTWithThirdWritingTxCache end - " + testCount);
- }
-
- public void testSTWithWritingNonTxThread() throws Exception
- {
- testCount++;
- log.info("testSTWithWritingNonTxThread start - " + testCount);
- writingThreadTest(false, "nbst3");
- log.info("testSTWithWritingNonTxThread end - " + testCount);
- }
-
- public void testSTWithWritingTxThread() throws Exception
- {
- testCount++;
- log.info("testSTWithWritingTxThread start - " + testCount);
- writingThreadTest(true, "nbst4");
- log.info("testSTWithWritingTxThread end - " + testCount);
- }
-
- private void thirdWritingCacheTest(boolean tx, String name) throws
InterruptedException
- {
- CacheSPI<Object, Object> cache1 = null, cache2 = null, cache3 = null;
- try
- {
- cache1 = createCache(name);
- cache3 = createCache(name);
-
- writeInitialData(cache1);
-
- // Delay the transient copy, so that we get a more thorough log test
- cache1.put("/delay", "delay", new DelayTransfer());
-
- WritingRunner writer = new WritingRunner(cache3, tx);
- Thread writerThread = new Thread(writer);
- writerThread.start();
-
- cache2 = createCache(name);
-
- // Pause to give caches time to see each other
- TestingUtil.blockUntilViewsReceived(new CacheSPI[] { cache1, cache2, cache3 },
60000);
-
- writer.stop();
- writerThread.join();
-
- verifyInitialData(cache2);
-
- int count = writer.result();
-
- for (int c = 0; c < count; c++)
- assertEquals(c, cache2.get("/test" + c, "test"));
- }
- finally
- {
- TestingUtil.killCaches(cache1, cache2, cache3);
- }
- }
-
- private void verifyInitialData(CacheSPI<Object, Object> cache2)
- {
- assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B,
"name"));
- assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B,
"age"));
- assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C,
"name"));
- assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C,
"age"));
- }
-
- private void writeInitialData(final CacheSPI<Object, Object> cache1)
- {
- cache1.put(A_B, "name", JOE);
- cache1.put(A_B, "age", TWENTY);
- cache1.put(A_C, "name", BOB);
- cache1.put(A_C, "age", FORTY);
- }
-
- private void writingThreadTest(boolean tx, String name) throws InterruptedException
- {
- CacheSPI<Object, Object> cache1 = null, cache2 = null;
- try
- {
- cache1 = createCache(name);
-
- writeInitialData(cache1);
-
- // Delay the transient copy, so that we get a more thorough log test
- cache1.put("/delay", "delay", new DelayTransfer());
-
- WritingRunner writer = new WritingRunner(cache1, tx);
- Thread writerThread = new Thread(writer);
- writerThread.start();
-
- cache2 = createCache(name);
-
- // Pause to give caches time to see each other
- TestingUtil.blockUntilViewsReceived(new CacheSPI[] { cache1, cache2 }, 60000);
-
- writer.stop();
- writerThread.join();
-
- verifyInitialData(cache2);
-
- int count = writer.result();
-
- for (int c = 0; c < count; c++)
- assertEquals(c, cache2.get("/test" + c, "test"));
- }
- finally
- {
- TestingUtil.killCaches(cache1, cache2);
- }
- }
-}