[infinispan-issues] [JBoss JIRA] (ISPN-9068) Indexer throw exception in cluster mode

Sergey Chernolyas (JIRA) issues at jboss.org
Thu Apr 12 04:36:00 EDT 2018


    [ https://issues.jboss.org/browse/ISPN-9068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13559979#comment-13559979 ] 

Sergey Chernolyas commented on ISPN-9068:
-----------------------------------------


{code:java}
package ru.beeline.re.infinispan.marshaller;

import ru.beeline.re.entities.News;

import java.io.IOException;

/**
 * @author Sergey_Chernolyas
 */
public class NewsMarshaller extends AbstractBaseMarshaller<News> {

    @Override
    public Class<? extends News> getJavaClass() {
        return News.class;
    }

    @Override
    public News readFrom(ProtoStreamReader reader) throws IOException {
        News news = new News();
        news.setTimestamp(reader.readLong("timestamp"));
        news.setType(reader.readString("type"));
        news.setChannelId(reader.readString("channelId"));
        news.setId(reader.readString("id"));
        return news;
    }

    @Override
    public void writeTo(ProtoStreamWriter writer, News news) throws IOException {
        writer.writeLong("timestamp", news.getTimestamp());
        writer.writeString("type", news.getType());
        writer.writeString("channelId", news.getChannelId());
        writer.writeString("id", news.getId());
    }
}
{code}


{code:java}
package ru.beeline.re.entities;

import org.infinispan.protostream.annotations.ProtoDoc;
import org.infinispan.protostream.annotations.ProtoField;

import java.io.Serializable;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiPredicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@ProtoDoc("@Indexed")
public class News implements Serializable {

    private static final long serialVersionUID = -7389340513007964365L;
    private String id;

    private long timestamp;

    private String type;

    private String channelId;

    public News() {
    }

    public News(String id, long time, String channelId, String type) {
        this.id = id;
        this.timestamp = time;
        this.channelId = channelId;
        this.type = type;
    }

    public String getId() {
        return id;
    }
    @ProtoField(number = 4, required = true)
    public void setId(String id) {
        this.id = id;
    }
    @ProtoDoc("@Field(index=Index.YES, analyze = Analyze.NO, store = Store.NO)")
    @ProtoDoc("@NumericField")
    @ProtoDoc("@SortableField")
    @ProtoField(number = 1, required = true)
    public long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(long time) {
        this.timestamp = time;
    }

    @ProtoField(number = 3, required = true)
    @ProtoDoc("@Field(index=Index.YES, analyze = Analyze.NO, store = Store.NO)")
    public String getChannelId() {
        return channelId;
    }
    public void setChannelId(String channelId) {
        this.channelId = channelId;
    }

    @ProtoDoc("@Field(index=Index.YES, analyze = Analyze.NO, store = Store.NO)")
    @ProtoField(number = 2, required = true)
    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        News news = (News) o;
        return timestamp == news.timestamp &&
                Objects.equals(id, news.id) &&
                Objects.equals(type, news.type) &&
                Objects.equals(channelId, news.channelId);
    }

    @Override
    public int hashCode() {
        return Objects.hash(id, timestamp, type, channelId);
    }

    public enum Type {
        NEWS((type, alias) ->
                "feedItem".equalsIgnoreCase(type)
                        && !"flocktory".equalsIgnoreCase(alias)
                        && !"Beeline".equalsIgnoreCase(alias)),
        OFFER((type, alias) ->
                "Action".equalsIgnoreCase(type)
                        || "flocktory".equalsIgnoreCase(alias)
                        || "Beeline".equalsIgnoreCase(alias));

        Type(BiPredicate<String, String> test) {
            this.test = test;
        }

        private final BiPredicate<String, String> test;

        public static Optional<String> getType(String type, String alias) {
            List<String> types = Stream.of(values())
                    .filter(t -> t.test.test(type, alias))
                    .map(Enum::name)
                    .collect(Collectors.toList());
            if (types.size() > 1) {
                throw new IllegalStateException("For type " + type + " and alias " + alias + " was found more than one types");
            } else if (types.size() == 1) {
                return Optional.of(types.get(0));
            } else {
                return Optional.empty();
            }
        }
    }
}

{code}

client code:

{code:java}
package ru.beeline.re.infinispan;

import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.ClientIntelligence;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.client.hotrod.configuration.NearCacheMode;
import org.infinispan.client.hotrod.impl.transport.tcp.RoundRobinBalancingStrategy;
import org.infinispan.client.hotrod.marshall.ProtoStreamMarshaller;
import org.infinispan.protostream.FileDescriptorSource;
import org.infinispan.protostream.SerializationContext;
import ru.beeline.re.entities.News;
import ru.beeline.re.utils.CacheName;

import java.io.IOException;
import java.util.UUID;

/**
 * @author Sergey Chernolyas &amp;sergey_chernolyas at gmail.com&amp;
 */
public class NewsIndexTest {


    public static void main(String[] argv) throws IOException {


        RemoteCacheManager remoteCacheManager = infinispan();

        News news = new News();
        news.setId(UUID.randomUUID().toString());
        news.setChannelId(UUID.randomUUID().toString());
        news.setType(News.Type.NEWS.name());
        news.setTimestamp(0);

        RemoteCache<String, News> newsCache = remoteCacheManager.getCache(CacheName.NEWS.name());
        System.out.println("stat : "+newsCache.stats().getStatsMap());
        newsCache.put("testnews",news);
        System.out.println("stat : "+newsCache.stats().getStatsMap());

    }

    private static RemoteCacheManager infinispan() {

        try {
            ConfigurationBuilder builder = new ConfigurationBuilder();
            builder.marshaller(new ProtoStreamMarshaller());
            //builder.addServers("172.21.206.13:11322;172.21.206.202:11322");
            builder.addServers("172.21.206.13:11322");
            builder.balancingStrategy(new RoundRobinBalancingStrategy())
                    .clientIntelligence(ClientIntelligence.HASH_DISTRIBUTION_AWARE);
            builder.nearCache().mode(NearCacheMode.INVALIDATED).maxEntries(1_000);

            RemoteCacheManager remoteCacheManager = new RemoteCacheManager(builder.build(true));

            SerializationContext serCtx = ProtoStreamMarshaller.getSerializationContext(remoteCacheManager);

            FileDescriptorSource fds = new FileDescriptorSource();
            fds.addProtoFiles("/infinispan/re_model.proto", "/infinispan/re_entities.proto", "/infinispan/re_shop_entities.proto");
            serCtx.registerProtoFiles(fds);
            serCtx.registerMarshallerProvider(new MarshallerProvider());

            for (Class pojoClass : MarshallerProvider.getClassMessageMarshallerMap().keySet()) {
                System.out.println(" can marshall   " + pojoClass + "  ?  :" + serCtx.canMarshall(pojoClass));
            }

            // Connect to the server
            return remoteCacheManager;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

{code}





> Indexer throw exception in cluster mode
> ---------------------------------------
>
>                 Key: ISPN-9068
>                 URL: https://issues.jboss.org/browse/ISPN-9068
>             Project: Infinispan
>          Issue Type: Bug
>    Affects Versions: 9.2.1.Final
>            Reporter: Sergey Chernolyas
>            Priority: Critical
>         Attachments: re_clustered_rocksdb.xml, re_entities.proto
>
>
> I have a cluster that contains two hosts.
> Also one cache have a indexing.
> Then I try add entity  (see entity News in attached proto) to the cache, I take exception:
> __________
> 2018-04-11 21:17:54,695 ERROR [org.infinispan.interceptors.impl.InvocationContextInterceptor] (jgroups-17,server2) ISPN000136: Error executing command PutKeyValueCommand, writing keys [WrappedByteArray{bytes=[B0x4A24303030303030..[38], hashCode=61362266}]: org.hibernate.search.exception.SearchException: Unable to perform work. Entity Class is not @Indexed nor hosts @ContainedIn: org.infinispan.query.backend.QueryInterceptor$1
>         at org.hibernate.search.backend.impl.PerTransactionWorker.performWork(PerTransactionWorker.java:63)
>         at org.infinispan.query.backend.QueryInterceptor.performSearchWorks(QueryInterceptor.java:383)
>         at org.infinispan.query.backend.QueryInterceptor.removeFromIndexes(QueryInterceptor.java:337)
>         at org.infinispan.query.backend.QueryInterceptor.processChange(QueryInterceptor.java:448)
>         at org.infinispan.query.backend.QueryInterceptor.lambda$handleDataWriteCommand$0(QueryInterceptor.java:184)
>         at org.infinispan.interceptors.InvocationSuccessAction.apply(InvocationSuccessAction.java:19)
>         at org.infinispan.interceptors.impl.QueueAsyncInvocationStage.invokeQueuedHandlers(QueueAsyncInvocationStage.java:118)
>         at org.infinispan.interceptors.impl.QueueAsyncInvocationStage.accept(QueueAsyncInvocationStage.java:81)
>         at org.infinispan.interceptors.impl.QueueAsyncInvocationStage.accept(QueueAsyncInvocationStage.java:30)
>         at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>         at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>         at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>         at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>         at org.infinispan.remoting.transport.AbstractRequest.complete(AbstractRequest.java:66)
>         at org.infinispan.remoting.transport.impl.MultiTargetRequest.onResponse(MultiTargetRequest.java:102)
>         at org.infinispan.remoting.transport.impl.RequestRepository.addResponse(RequestRepository.java:53)
>         at org.infinispan.remoting.transport.jgroups.JGroupsTransport.processResponse(JGroupsTransport.java:1304)
>         at org.infinispan.remoting.transport.jgroups.JGroupsTransport.processMessage(JGroupsTransport.java:1207)
>         at org.infinispan.remoting.transport.jgroups.JGroupsTransport.access$200(JGroupsTransport.java:123)
>         at org.infinispan.remoting.transport.jgroups.JGroupsTransport$ChannelCallbacks.receive(JGroupsTransport.java:1342)
>         at org.jgroups.JChannel.up(JChannel.java:819)
>         at org.jgroups.fork.ForkProtocolStack.up(ForkProtocolStack.java:134)
>         at org.jgroups.stack.Protocol.up(Protocol.java:340)
>         at org.jgroups.protocols.FORK.up(FORK.java:134)
>         at org.jgroups.protocols.FRAG3.up(FRAG3.java:171)
>         at org.jgroups.protocols.FlowControl.up(FlowControl.java:343)
>         at org.jgroups.protocols.FlowControl.up(FlowControl.java:343)
>         at org.jgroups.protocols.pbcast.GMS.up(GMS.java:864)
>         at org.jgroups.protocols.pbcast.STABLE.up(STABLE.java:240)
>         at org.jgroups.protocols.UNICAST3.deliverMessage(UNICAST3.java:1002)
>         at org.jgroups.protocols.UNICAST3.handleDataReceived(UNICAST3.java:728)
>         at org.jgroups.protocols.UNICAST3.up(UNICAST3.java:383)
>         at org.jgroups.protocols.pbcast.NAKACK2.up(NAKACK2.java:600)
>         at org.jgroups.protocols.VERIFY_SUSPECT.up(VERIFY_SUSPECT.java:119)
>         at org.jgroups.protocols.FD_ALL.up(FD_ALL.java:199)
>         at org.jgroups.protocols.FD_SOCK.up(FD_SOCK.java:252)
>         at org.jgroups.protocols.MERGE3.up(MERGE3.java:276)
>         at org.jgroups.protocols.Discovery.up(Discovery.java:267)
>         at org.jgroups.protocols.TP.passMessageUp(TP.java:1248)
>         at org.jgroups.util.SubmitToThreadPool$SingleMessageHandler.run(SubmitToThreadPool.java:87)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> _____



--
This message was sent by Atlassian JIRA
(v7.5.0#75005)


More information about the infinispan-issues mailing list