Connect Apicurio schema registry with SSL to Kafka
by Janez Bindas
Hi all,
We have a problem with settings of Apicurio Schema Registry. We have basic configuration of Kafka cluster with SSL. But when we try to connect Apicurio with Kafka we get errors.
This is our docker script to run Apicurio.
docker run -it --env KAFKA_BOOTSTRAP_SERVERS=b-3.dev.kdm41f.c4.kafka.eu <http://dev.kdm41f.c4.kafka.eu/>-central-1.amazonaws.com:9094 --env 'JAVA_OPTIONS=-Dquarkus.profile=prod -D%prod.registry.streams.topology.security.protocol=SSL -D%prod.registry.kafka.snapshot-consumer.security.protocol=SSL -Dsecurity.protocol=SSL' apicurio/apicurio-registry-kafka:latest
Output:
…..
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
…..
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = SSL
security.providers = null
send.buffer.bytes = 131072
…..
I think that the first time Apicurio tries to connect it connects with PLAINTEXT (in red) and second times it connects with SSL (in red).
Can you please help me to configurate Apicurio that use SSL?
Regards Janez Bindas
7 hours, 42 minutes
Apicurio Registry 2.0.0!
by Eric Wittmann
Hey everyone. Happy Monday.
At the end of last week we released version 2.0.0.RC1 of Apicurio Registry
(product name: Red Hat Integration - Service Registry). This new major
version has a lot of changes and improvements. Here is quick list of cool
new stuff:
* Authentication via integration with Keycloak/RHSSO
* Support for groups of artifacts
* Updated Serdes classes for easier and more consistent configuration
* Support for the new CNCF Schema Registry API
* Multi-tenancy support! (This was mostly done to support MAS efforts)
* Option to store persistent registry data in a SQL database (postgresql
only)
* Hybrid storage implementation using Kafka (simple topic with log
compaction enabled) and an in-memory H2 database
* Improved artifact searching/filtering
* New REST client using only the Java 11 HTTP client
* Event sourcing (changes to artifacts in the registry can result in events
firing)
* And much more!
For more information about the changes, you can go here:
https://github.com/Apicurio/apicurio-registry/releases/tag/2.0.0.RC1
We will be doing a lot of testing and documenting over the next couple of
weeks before releasing 2.0.0.Final. If anyone is interested in giving the
new version a try, now is a great time! (you can report any bugs you find)
You can try it out immediately using our in-memory ( non-persistent )
version by doing this:
docker run -it -p 8080:8080 apicurio/apicurio-registry-mem:2.0.0.RC1
Then go to http://localhost:8080/ui or http://localhost:8080/apis to either
use the user interface or see a list of the APIs we support (respectively).
Thank you to everyone who contributed to this release! I think we should
be in a great position to support both our Red Hat Integration product as
well as our Managed Services efforts with this latest major version release.
--
Eric Wittmann
Principal Software Engineer - Apicurio - Red Hat
He / Him / His
eric.wittmann(a)redhat.com
4 months
Junit test for Apicurio registry
by Janez Bindas
Hi all,
I would like to test Kafka workflow with Apicurio registry. I wasn't able
to registry the schema in memory. Can you tell me how to do it?
This is my test class:
@EmbeddedKafka
@ExtendWith(SpringExtension.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class KafkaTest {
private static final String TOPIC = "foobar";
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
BlockingQueue<ConsumerRecord<FooBarKey, FooBarValue>> records;
KafkaMessageListenerContainer<FooBarKey, FooBarValue> container;
@BeforeEach
void setUp() {
Map<String, Object> configs = new
HashMap<>(KafkaTestUtils.consumerProps("consumer", "false",
embeddedKafkaBroker));
DefaultKafkaConsumerFactory<FooBarKey, FooBarValue>
consumerFactory = new DefaultKafkaConsumerFactory<>(configs, new
AvroKafkaDeserializer<>(), new AvroKafkaDeserializer<>());
ContainerProperties containerProperties = new
ContainerProperties(TOPIC);
container = new
KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
records = new LinkedBlockingQueue<>();
container.setupMessageListener((MessageListener<FooBarKey,
FooBarValue>) records::add);
container.start();
ContainerTestUtils.waitForAssignment(container,
embeddedKafkaBroker.getPartitionsPerTopic());
}
@AfterEach
void tearDown() {
container.stop();
}
@Test
public void testKafka() throws InterruptedException {
Map<String, Object> configs = new
HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
Producer<Object, Object> producer = new
DefaultKafkaProducerFactory<>(configs, new AvroKafkaSerializer<>(),
new AvroKafkaSerializer<>()).createProducer();
FooBarKey fooBarKey = new FooBarKey();
fooBarKey.setId("33217330-1968-40c6-974a-d5aae70f3692");
FooBarValue fooBarValue = new FooBarValue();
fooBarValue.setBar("test");
fooBarValue.setFoo("Test2");
producer.send(new ProducerRecord<>(TOPIC, fooBarKey, fooBarValue));
producer.flush();
ConsumerRecord<FooBarKey, FooBarValue> singleRecord =
records.poll(100, TimeUnit.MILLISECONDS);
Assertions.assertThat(singleRecord).isNotNull();
}
}
avro file for key:
{
"type" : "record",
"name" : "FooBarKey",
"namespace" : "com.umwerk.vbank",
"doc" : "FooBar KEY schema",
"fields" : [ {
"name" : "id",
"type" : "string",
"doc" : "FooBar Id",
"logicalType" : "uuid"
} ]
}
avro file for value:
{
"type" : "record",
"name" : "FooBarValue",
"namespace" : "com.umwerk.vbank",
"doc" : "FooBar VALUE schema",
"fields" : [ {
"name" : "foo",
"type" : "string",
"doc" : "Test foo property"
}, {
"name" : "bar",
"type" : "string",
"doc" : "Test bar property"
} ]
}
Regards, Janez Bindas
2 years, 4 months