Hi Eric,
I still need help. I was able to start in test suit apicurio-registry-mem in
testcontainer. But now I need configuration to bind embeded kafka with that container and
register the generated schema. Do you have any suggestions for that?
Janez Bindas
On 22 Sep 2020, at 18:19, Eric Wittmann
<eric.wittmann(a)redhat.com> wrote:
My suggestion is to use the in-memory version of Apicurio Registry via docker:
https://hub.docker.com/r/apicurio/apicurio-registry-mem
<
https://hub.docker.com/r/apicurio/apicurio-registry-mem>
If you combine that with TestContainers you should be able to start up the registry as
part of your JUnit test setup and have it available for the test:
https://www.testcontainers.org/ <
https://www.testcontainers.org/>
We don't currently have a version of the registry that is easily embeddable in a unit
test (or other Java app) in any other way.
-Eric
On Tue, Sep 22, 2020 at 11:24 AM Janez Bindas <janez.bindas(a)gmail.com
<mailto:janez.bindas@gmail.com>> wrote:
Hi all,
I would like to test Kafka workflow with Apicurio registry. I wasn't able to registry
the schema in memory. Can you tell me how to do it?
This is my test class:
@EmbeddedKafka
@ExtendWith(SpringExtension.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class KafkaTest {
private static final String TOPIC = "foobar";
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
BlockingQueue<ConsumerRecord<FooBarKey, FooBarValue>> records;
KafkaMessageListenerContainer<FooBarKey, FooBarValue> container;
@BeforeEach
void setUp() {
Map<String, Object> configs = new
HashMap<>(KafkaTestUtils.consumerProps("consumer", "false",
embeddedKafkaBroker));
DefaultKafkaConsumerFactory<FooBarKey, FooBarValue> consumerFactory = new
DefaultKafkaConsumerFactory<>(configs, new AvroKafkaDeserializer<>(), new
AvroKafkaDeserializer<>());
ContainerProperties containerProperties = new ContainerProperties(TOPIC);
container = new KafkaMessageListenerContainer<>(consumerFactory,
containerProperties);
records = new LinkedBlockingQueue<>();
container.setupMessageListener((MessageListener<FooBarKey, FooBarValue>)
records::add);
container.start();
ContainerTestUtils.waitForAssignment(container,
embeddedKafkaBroker.getPartitionsPerTopic());
}
@AfterEach
void tearDown() {
container.stop();
}
@Test
public void testKafka() throws InterruptedException {
Map<String, Object> configs = new
HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
Producer<Object, Object> producer = new
DefaultKafkaProducerFactory<>(configs, new AvroKafkaSerializer<>(), new
AvroKafkaSerializer<>()).createProducer();
FooBarKey fooBarKey = new FooBarKey();
fooBarKey.setId("33217330-1968-40c6-974a-d5aae70f3692");
FooBarValue fooBarValue = new FooBarValue();
fooBarValue.setBar("test");
fooBarValue.setFoo("Test2");
producer.send(new ProducerRecord<>(TOPIC, fooBarKey, fooBarValue));
producer.flush();
ConsumerRecord<FooBarKey, FooBarValue> singleRecord = records.poll(100,
TimeUnit.MILLISECONDS);
Assertions.assertThat(singleRecord).isNotNull();
}
}
avro file for key:
{
"type" : "record",
"name" : "FooBarKey",
"namespace" : "com.umwerk.vbank",
"doc" : "FooBar KEY schema",
"fields" : [ {
"name" : "id",
"type" : "string",
"doc" : "FooBar Id",
"logicalType" : "uuid"
} ]
}
avro file for value:
{
"type" : "record",
"name" : "FooBarValue",
"namespace" : "com.umwerk.vbank",
"doc" : "FooBar VALUE schema",
"fields" : [ {
"name" : "foo",
"type" : "string",
"doc" : "Test foo property"
}, {
"name" : "bar",
"type" : "string",
"doc" : "Test bar property"
} ]
}
Regards, Janez Bindas
_______________________________________________
Apicurio mailing list -- apicurio(a)lists.jboss.org
<mailto:apicurio@lists.jboss.org>
To unsubscribe send an email to apicurio-leave(a)lists.jboss.org
<mailto:apicurio-leave@lists.jboss.org>
--
Eric Wittmann
Principal Software Engineer - Apicurio - Red Hat
He / Him / His
eric.wittmann(a)redhat.com <mailto:eric.wittmann@redhat.com>