Junit test for Apicurio registry
by Janez Bindas
Hi all,
I would like to test Kafka workflow with Apicurio registry. I wasn't able
to registry the schema in memory. Can you tell me how to do it?
This is my test class:
@EmbeddedKafka
@ExtendWith(SpringExtension.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class KafkaTest {
private static final String TOPIC = "foobar";
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
BlockingQueue<ConsumerRecord<FooBarKey, FooBarValue>> records;
KafkaMessageListenerContainer<FooBarKey, FooBarValue> container;
@BeforeEach
void setUp() {
Map<String, Object> configs = new
HashMap<>(KafkaTestUtils.consumerProps("consumer", "false",
embeddedKafkaBroker));
DefaultKafkaConsumerFactory<FooBarKey, FooBarValue>
consumerFactory = new DefaultKafkaConsumerFactory<>(configs, new
AvroKafkaDeserializer<>(), new AvroKafkaDeserializer<>());
ContainerProperties containerProperties = new
ContainerProperties(TOPIC);
container = new
KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
records = new LinkedBlockingQueue<>();
container.setupMessageListener((MessageListener<FooBarKey,
FooBarValue>) records::add);
container.start();
ContainerTestUtils.waitForAssignment(container,
embeddedKafkaBroker.getPartitionsPerTopic());
}
@AfterEach
void tearDown() {
container.stop();
}
@Test
public void testKafka() throws InterruptedException {
Map<String, Object> configs = new
HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
Producer<Object, Object> producer = new
DefaultKafkaProducerFactory<>(configs, new AvroKafkaSerializer<>(),
new AvroKafkaSerializer<>()).createProducer();
FooBarKey fooBarKey = new FooBarKey();
fooBarKey.setId("33217330-1968-40c6-974a-d5aae70f3692");
FooBarValue fooBarValue = new FooBarValue();
fooBarValue.setBar("test");
fooBarValue.setFoo("Test2");
producer.send(new ProducerRecord<>(TOPIC, fooBarKey, fooBarValue));
producer.flush();
ConsumerRecord<FooBarKey, FooBarValue> singleRecord =
records.poll(100, TimeUnit.MILLISECONDS);
Assertions.assertThat(singleRecord).isNotNull();
}
}
avro file for key:
{
"type" : "record",
"name" : "FooBarKey",
"namespace" : "com.umwerk.vbank",
"doc" : "FooBar KEY schema",
"fields" : [ {
"name" : "id",
"type" : "string",
"doc" : "FooBar Id",
"logicalType" : "uuid"
} ]
}
avro file for value:
{
"type" : "record",
"name" : "FooBarValue",
"namespace" : "com.umwerk.vbank",
"doc" : "FooBar VALUE schema",
"fields" : [ {
"name" : "foo",
"type" : "string",
"doc" : "Test foo property"
}, {
"name" : "bar",
"type" : "string",
"doc" : "Test bar property"
} ]
}
Regards, Janez Bindas
1 year, 3 months
Connect Apicurio schema registry with SSL to Kafka
by Janez Bindas
Hi all,
We have a problem with settings of Apicurio Schema Registry. We have basic configuration of Kafka cluster with SSL. But when we try to connect Apicurio with Kafka we get errors.
This is our docker script to run Apicurio.
docker run -it --env KAFKA_BOOTSTRAP_SERVERS=b-3.dev.kdm41f.c4.kafka.eu <http://dev.kdm41f.c4.kafka.eu/>-central-1.amazonaws.com:9094 --env 'JAVA_OPTIONS=-Dquarkus.profile=prod -D%prod.registry.streams.topology.security.protocol=SSL -D%prod.registry.kafka.snapshot-consumer.security.protocol=SSL -Dsecurity.protocol=SSL' apicurio/apicurio-registry-kafka:latest
Output:
…..
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
…..
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = SSL
security.providers = null
send.buffer.bytes = 131072
…..
I think that the first time Apicurio tries to connect it connects with PLAINTEXT (in red) and second times it connects with SSL (in red).
Can you please help me to configurate Apicurio that use SSL?
Regards Janez Bindas
4 years