Well, there are a lot of ways you could go with this, so it really depends
a lot on what you're trying to do/test. But here is a demo of how to use
the Apicurio Registry with a java bean payload and auto-generated Avro
schema:
That directory has a producer and consumer app. It also has a Java bean
called Greeting. This particular example allows the producer app to
auto-register the Avro schema (generated from the Java bean class) in the
registry. If you already have the Avro schema and you want to register it
e.g. as part of "setUp()" then you can use the Apicurio Registry Rest
Client for that. An example of using the client can be found here:
That's a little bit more complicated that you'll need - just create the
client and then do this:
Note that the Avro example I linked above uses a String as the key and the
Java bean as the value. Since you're using Java beans for both your config
will need to be a little different. Specifically you'll need to use
AvroKafkaSerializer for both the key and value serializers, and you'll need
to use a different REGISTRY_ARTIFACT_ID_STRATEGY_CONFIG_PARAM than the
example (use TopicIdStrategy instead of SimpleTopicIdStrategy).
Hopefully that will get you started.
On Wed, Sep 23, 2020 at 3:14 AM Janez Bindas <janez.bindas(a)gmail.com> wrote:
Hi Eric,
I still need help. I was able to start in test suit apicurio-registry-mem
in testcontainer. But now I need configuration to bind embeded kafka with
that container and register the generated schema. Do you have any
suggestions for that?
Janez Bindas
On 22 Sep 2020, at 18:19, Eric Wittmann <eric.wittmann(a)redhat.com> wrote:
My suggestion is to use the in-memory version of Apicurio Registry via
docker:
https://hub.docker.com/r/apicurio/apicurio-registry-mem
If you combine that with TestContainers you should be able to start up the
registry as part of your JUnit test setup and have it available for the
test:
https://www.testcontainers.org/
We don't currently have a version of the registry that is easily
embeddable in a unit test (or other Java app) in any other way.
-Eric
On Tue, Sep 22, 2020 at 11:24 AM Janez Bindas <janez.bindas(a)gmail.com>
wrote:
> Hi all,
>
> I would like to test Kafka workflow with Apicurio registry. I wasn't able
> to registry the schema in memory. Can you tell me how to do it?
>
> This is my test class:
>
> @EmbeddedKafka
> @ExtendWith(SpringExtension.class)
> @TestInstance(TestInstance.Lifecycle.PER_CLASS)
> public class KafkaTest {
>
> private static final String TOPIC = "foobar";
>
> @Autowired
> private EmbeddedKafkaBroker embeddedKafkaBroker;
>
> BlockingQueue<ConsumerRecord<FooBarKey, FooBarValue>> records;
>
> KafkaMessageListenerContainer<FooBarKey, FooBarValue> container;
>
> @BeforeEach
> void setUp() {
> Map<String, Object> configs = new
HashMap<>(KafkaTestUtils.consumerProps("consumer", "false",
embeddedKafkaBroker));
> DefaultKafkaConsumerFactory<FooBarKey, FooBarValue> consumerFactory =
new DefaultKafkaConsumerFactory<>(configs, new AvroKafkaDeserializer<>(), new
AvroKafkaDeserializer<>());
> ContainerProperties containerProperties = new ContainerProperties(TOPIC);
> container = new KafkaMessageListenerContainer<>(consumerFactory,
containerProperties);
> records = new LinkedBlockingQueue<>();
> container.setupMessageListener((MessageListener<FooBarKey,
FooBarValue>) records::add);
> container.start();
> ContainerTestUtils.waitForAssignment(container,
embeddedKafkaBroker.getPartitionsPerTopic());
>
> }
>
> @AfterEach
> void tearDown() {
> container.stop();
> }
>
> @Test
> public void testKafka() throws InterruptedException {
> Map<String, Object> configs = new
HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
> Producer<Object, Object> producer = new
DefaultKafkaProducerFactory<>(configs, new AvroKafkaSerializer<>(), new
AvroKafkaSerializer<>()).createProducer();
>
> FooBarKey fooBarKey = new FooBarKey();
> fooBarKey.setId("33217330-1968-40c6-974a-d5aae70f3692");
>
>
> FooBarValue fooBarValue = new FooBarValue();
> fooBarValue.setBar("test");
> fooBarValue.setFoo("Test2");
>
> producer.send(new ProducerRecord<>(TOPIC, fooBarKey, fooBarValue));
> producer.flush();
>
> ConsumerRecord<FooBarKey, FooBarValue> singleRecord = records.poll(100,
TimeUnit.MILLISECONDS);
> Assertions.assertThat(singleRecord).isNotNull();
>
> }
>
> }
>
> avro file for key:
>
> {
> "type" : "record",
> "name" : "FooBarKey",
> "namespace" : "com.umwerk.vbank",
> "doc" : "FooBar KEY schema",
> "fields" : [ {
> "name" : "id",
> "type" : "string",
> "doc" : "FooBar Id",
> "logicalType" : "uuid"
> } ]
> }
>
> avro file for value:
>
> {
> "type" : "record",
> "name" : "FooBarValue",
> "namespace" : "com.umwerk.vbank",
> "doc" : "FooBar VALUE schema",
> "fields" : [ {
> "name" : "foo",
> "type" : "string",
> "doc" : "Test foo property"
> }, {
> "name" : "bar",
> "type" : "string",
> "doc" : "Test bar property"
> } ]
> }
>
>
> Regards, Janez Bindas
>
> _______________________________________________
> Apicurio mailing list -- apicurio(a)lists.jboss.org
> To unsubscribe send an email to apicurio-leave(a)lists.jboss.org
>
--
Eric Wittmann
Principal Software Engineer - Apicurio - Red Hat
He / Him / His
eric.wittmann(a)redhat.com
--
Eric Wittmann
Principal Software Engineer - Apicurio - Red Hat
He / Him / His
eric.wittmann(a)redhat.com