Skip to main content
Redhat Developers  Logo
  • Products

    Featured

    • Red Hat Enterprise Linux
      Red Hat Enterprise Linux Icon
    • Red Hat OpenShift AI
      Red Hat OpenShift AI
    • Red Hat Enterprise Linux AI
      Linux icon inside of a brain
    • Image mode for Red Hat Enterprise Linux
      RHEL image mode
    • Red Hat OpenShift
      Openshift icon
    • Red Hat Ansible Automation Platform
      Ansible icon
    • Red Hat Developer Hub
      Developer Hub
    • View All Red Hat Products
    • Linux

      • Red Hat Enterprise Linux
      • Image mode for Red Hat Enterprise Linux
      • Red Hat Universal Base Images (UBI)
    • Java runtimes & frameworks

      • JBoss Enterprise Application Platform
      • Red Hat build of OpenJDK
    • Kubernetes

      • Red Hat OpenShift
      • Microsoft Azure Red Hat OpenShift
      • Red Hat OpenShift Virtualization
      • Red Hat OpenShift Lightspeed
    • Integration & App Connectivity

      • Red Hat Build of Apache Camel
      • Red Hat Service Interconnect
      • Red Hat Connectivity Link
    • AI/ML

      • Red Hat OpenShift AI
      • Red Hat Enterprise Linux AI
    • Automation

      • Red Hat Ansible Automation Platform
      • Red Hat Ansible Lightspeed
    • Developer tools

      • Red Hat Trusted Software Supply Chain
      • Podman Desktop
      • Red Hat OpenShift Dev Spaces
    • Developer Sandbox

      Developer Sandbox
      Try Red Hat products and technologies without setup or configuration fees for 30 days with this shared Openshift and Kubernetes cluster.
    • Try at no cost
  • Technologies

    Featured

    • AI/ML
      AI/ML Icon
    • Linux
      Linux Icon
    • Kubernetes
      Cloud icon
    • Automation
      Automation Icon showing arrows moving in a circle around a gear
    • View All Technologies
    • Programming Languages & Frameworks

      • Java
      • Python
      • JavaScript
    • System Design & Architecture

      • Red Hat architecture and design patterns
      • Microservices
      • Event-Driven Architecture
      • Databases
    • Developer Productivity

      • Developer productivity
      • Developer Tools
      • GitOps
    • Secure Development & Architectures

      • Security
      • Secure coding
    • Platform Engineering

      • DevOps
      • DevSecOps
      • Ansible automation for applications and services
    • Automated Data Processing

      • AI/ML
      • Data Science
      • Apache Kafka on Kubernetes
      • View All Technologies
    • Start exploring in the Developer Sandbox for free

      sandbox graphic
      Try Red Hat's products and technologies without setup or configuration.
    • Try at no cost
  • Learn

    Featured

    • Kubernetes & Cloud Native
      Openshift icon
    • Linux
      Rhel icon
    • Automation
      Ansible cloud icon
    • Java
      Java icon
    • AI/ML
      AI/ML Icon
    • View All Learning Resources

    E-Books

    • GitOps Cookbook
    • Podman in Action
    • Kubernetes Operators
    • The Path to GitOps
    • View All E-books

    Cheat Sheets

    • Linux Commands
    • Bash Commands
    • Git
    • systemd Commands
    • View All Cheat Sheets

    Documentation

    • API Catalog
    • Product Documentation
    • Legacy Documentation
    • Red Hat Learning

      Learning image
      Boost your technical skills to expert-level with the help of interactive lessons offered by various Red Hat Learning programs.
    • Explore Red Hat Learning
  • Developer Sandbox

    Developer Sandbox

    • Access Red Hat’s products and technologies without setup or configuration, and start developing quicker than ever before with our new, no-cost sandbox environments.
    • Explore Developer Sandbox

    Featured Developer Sandbox activities

    • Get started with your Developer Sandbox
    • OpenShift virtualization and application modernization using the Developer Sandbox
    • Explore all Developer Sandbox activities

    Ready to start developing apps?

    • Try at no cost
  • Blog
  • Events
  • Videos

Getting started with Red Hat Integration service registry

December 16, 2019
Hugo Guerrero

Share:

    New projects require some help. Imagine you are getting ready to start that new feature your business has been asking for the last couple of months. Your team is ready to start coding to implement the new awesome thing that would change your business.

    To achieve it, the team will need to interact with the current existing software components of your organization. Your developers will need to interact with API services and event endpoints already available in your architecture. Before being able to send and process information, developers need to be aware of the structure or schema expected by those services.

    Red Hat announced the Technical Preview of the Red Hat Integration service registry to help teams to govern their services schemas. The service registry is a store for schema (and API design) artifacts providing a REST API and a set of optional rules for enforcing content validity and evolution. Teams can now use the service registry to query for the schemas required by each service endpoint or register and store new structures for future use.

    Service registry overview

    The Red Hat Integration service registry is a datastore for standard event schemas and API designs. It enables developers to decouple the structure of their data from their applications and to share and manage their data structure using a REST interface. Red Hat service registry is built on the Apicurio Registry open source community project.

    The service registry handles the following data formats:

    • Apache Avro
    • JSON Schema
    • Protobuf (protocol buffers)
    • OpenAPI
    • AsyncAPI

    You can configure rules for each artifact added to the registry to govern content evolution. All rules configured for an artifact must pass before a new version can be uploaded to the registry. The goal of these rules is to prevent invalid content from being added to the registry.

    Using the service registry with Apache Kafka

    As Apache Kafka handles the actual messages value content as an opaque byte array, the usage of serialization systems is strongly suggested. Apache Avro is one of the commonly used data formats to encode Kafka data. Avro is a data serialization system that relies on schemas defined with JSON and supports schema versioning. Then Avro can convert our data based on our schema into byte arrays to send then to Kafka. Consumers can use the Avro schemas to correctly deserialize the data received.

    The Red Hat Integration service registry provides full Kafka schema registry support to store Avro schemas. Also, the provided maven repository includes a custom Kafka client serializer/deserializer (SerDe). These utilities can be used by Kafka client developers to integrate with the registry. These Java classes allow Kafka client applications to push/pull their schemas from the service registry at runtime.

    Running Kafka and Registry

    For this example, we will use a local docker-compose Kafka cluster based on Strimzi and the service registry. Service registry uses Kafka as the main data store but you can also use in-memory or JPA (currently unsupported) based stores. We will use the in-memory store to simplify the usage process. As I mentioned before, neither docker-compose nor the in-memory storage is recommended for use in production.

    To begin with, download my preconfigured docker-compose.yaml file and start the services for running locally.

    $ docker-compose -f docker-compose.yaml up

    Kafka will be running on localhost:9092 and the registry in localhost:8081

    Creating network "post_default" with the default driver
    Pulling zookeeper (strimzi/kafka:0.11.3-kafka-2.1.0)...
    …
    zookeeper_1  | [2019-12-09 16:56:55,407] INFO Got user-level KeeperException when processing sessionid:0x100000307a50000 type:multi cxid:0x38 zxid:0x1d txntype:-1 reqpath:n/a aborting remaining multi ops. Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election (org.apache.zookeeper.server.PrepRequestProcessor)
    kafka_1      | [2019-12-09 16:56:55,408] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
    

    Creating a new Quarkus project

    Now, that we started the required infrastructure, we need to create a simple client to send and consume messages to the Kafka cluster. In this scenario, I will create a simple Quarkus application using the MicroProfile reactive messaging extension for Kafka.

    First, open a new terminal window and create a new Maven project using the Quarkus plugin:

    mvn io.quarkus:quarkus-maven-plugin:1.4.2.Final:create \
        -DprojectGroupId=com.redhat \
        -DprojectArtifactId=kafka-registry \
        -Dextensions="kafka"

    After Maven downloads all the required artifacts you will see the “Build Success”:

    …
    [INFO] 
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time:  01:07 min
    [INFO] Finished at: 2019-12-09T12:17:51-05:00
    [INFO] ------------------------------------------------------------------------
    

    Open the newly created project in your preferred code editor; in my case, I will use VS Code. My editor has already installed useful extensions, like Java and Quarkus.

    Open the pom.xml and remove the quarkus-resteasy dependency and add these:

    <dependency>
         <groupId>org.jboss.resteasy</groupId>
         <artifactId>resteasy-jackson2-provider</artifactId>
       </dependency>
       <dependency>
         <groupId>io.apicurio</groupId>
         <artifactId>apicurio-registry-utils-serde</artifactId>
         <version>1.2.1.Final</version>
       </dependency>
    

    Create the following Java class under src/main/java/com/redhat/kafka/registry/AvroRegistryExample.java and add the following code:

    package com.redhat.kafka.registry;
     
    import java.io.File;
    import java.io.IOException;
    import java.util.Random;
    import java.util.concurrent.TimeUnit;
     
    import javax.enterprise.context.ApplicationScoped;
     
    import org.apache.avro.Schema;
    import org.apache.avro.generic.GenericData;
    import org.apache.avro.generic.GenericData.Record;
    import org.eclipse.microprofile.reactive.messaging.Outgoing;
     
    import io.reactivex.Flowable;
    import io.smallrye.reactive.messaging.kafka.KafkaRecord;
     
    @ApplicationScoped
    public class AvroRegistryExample {
     
       private Random random = new Random();
       private String[] symbols = new String[] { "RHT", "IBM", "MSFT", "AMZN" };
     
       @Outgoing("price-out")
       public Flowable<KafkaRecord<String, Record>> generate() throws IOException {
           Schema schema = new Schema.Parser().parse(
               new File(getClass().getClassLoader().getResource("price-schema.avsc").getFile())
           );
           return Flowable.interval(1000, TimeUnit.MILLISECONDS)
               .onBackpressureDrop()
               .map(tick -> {
                   Record record = new GenericData.Record(schema);
                   record.put("symbol", symbols[random.nextInt(4)]);
                   record.put("price", String.format("%.2f", random.nextDouble() * 100));
                   return KafkaRecord.of(record.get("symbol").toString(), record);
               });
       }
    }

    In the code, we are instructing the reactive messaging extension to send items from the stream to the price-out through the Outgoing annotation. The stream out is a Flowable RX Java 2 stream emitting new stock prices every 1 second.

    Working with Schemas

    As you might notice we need the Avro schema for this stock ticker to correctly format the message, so we will create a simple one under src/main/resources/price-schema.avsc with the following content:

    {
       "type": "record",
       "name": "price",
       "namespace": "com.redhat",
       "fields": [
           {
               "name": "symbol",
               "type": "string"
           },
           {
               "name": "price",
               "type": "string"
           }
       ]
    }

    In the previous file, we specified the symbol and the price fields to be included in the Avro record.

    We need to let the registry know that this is the schema we will be validating to every time we send a message to the Kafka Topic. To archive this, we will use the REST API provided by the registry to add the schema.

    First, we will create a new artifact with type AVRO by doing a POST call to the API using cURL. Remove spaces and format from the avro schema file to have a canonical version.

    curl -X POST -H "Content-type: application/json; artifactType=AVRO" -H "X-Registry-ArtifactId: prices-value" --data '{"type":"record","name":"price","namespace":"com.redhat","fields":[{"name":"symbol","type":"string"},{"name":"price","type":"string"}]}' http://localhost:8081/api/artifacts -s | jq
    

    This call will create a new artifact with prices-value as id. The rest of the headers are used to identify the schema as an AVRO schema and to indicate we are using JSON as the payload type.

    {
      "createdOn": 1575919739708,
      "modifiedOn": 1575919739708,
      "id": "prices-value",
      "version": 1,
      "type": "AVRO",
      "globalId": 4
    }
    

    Configuration

    Next, we need to configure the Kafka connector. This is done in the application properties file. So open the file under src/main/resources/application.properties and fill it out with the following configurations:

    # Configuration file
    kafka.bootstrap.servers=localhost:9092
     
    mp.messaging.outgoing.price-out.connector=smallrye-kafka
    mp.messaging.outgoing.price-out.client.id=price-producer
    mp.messaging.outgoing.price-out.topic=prices
    mp.messaging.outgoing.price-out.key.serializer=org.apache.kafka.common.serialization.StringSerializer
    mp.messaging.outgoing.price-out.value.serializer=io.apicurio.registry.utils.serde.AvroKafkaSerializer
     
    mp.messaging.outgoing.price-out.apicurio.registry.url=http://localhost:8081/api
    mp.messaging.outgoing.price-out.apicurio.registry.artifact-id=io.apicurio.registry.utils.serde.strategy.TopicIdStrategy

    In the previous file we indicate that we will be connecting to the localhost Kafka cluster running on port 9092, and configured the messaging outgoing channel price-out connector using smallrye-kafka extension with a StringSerializer for the key and a io.apicurio.registry.utils.serde.AvroKafkaSerializer class for the value.

    This configuration will enable us to use the Apicurio SerDe for managing access to the registry to validate the schema for our Avro record.

    The last two rows indicate where the registry is listening and the type of strategy used for the schema retrieval. In our example, we are using a TopicIdStrategy meaning we will search for artifacts with the same name as the Kafka topic we are sending our messages to.

    Running the application

    If you are ready is time to get the application running. For that, you just need to run the following command:

    ./mvnw compile quarkus:dev
    

    You will see in the log that your application is now sending messages to Kafka.

    2019-12-09 14:30:58,007 INFO  [io.sma.rea.mes.ext.MediatorManager] (main) Initializing mediators
    2019-12-09 14:30:58,203 INFO  [io.sma.rea.mes.ext.MediatorManager] (main) Connecting mediators
    2019-12-09 14:30:58,206 INFO  [io.sma.rea.mes.ext.MediatorManager] (main) Connecting method com.redhat.kafka.registry.AvroRegistryExample#generate to sink price-out
    2019-12-09 14:30:58,298 INFO  [io.quarkus] (main) Quarkus 1.0.1.Final started in 1.722s. 
    2019-12-09 14:30:58,301 INFO  [io.quarkus] (main) Profile dev activated. Live Coding activated.
    2019-12-09 14:30:58,301 INFO  [io.quarkus] (main) Installed features: [cdi, smallrye-context-propagation, smallrye-reactive-messaging, smallrye-reactive-messaging-kafka, smallrye-reactive-streams-operators]
    2019-12-09 14:30:58,332 INFO  [org.apa.kaf.cli.Metadata] (kafka-producer-network-thread | price-producer) Cluster ID: B2U0Vs6eQS-kjJG3_L2tCA
    2019-12-09 14:30:59,309 INFO  [io.sma.rea.mes.kaf.KafkaSink] (RxComputationThreadPool-1) Sending message io.smallrye.reactive.messaging.kafka.SendingKafkaMessage@12021771 to Kafka topic 'prices'
    2019-12-09 14:31:00,083 INFO  [io.sma.rea.mes.kaf.KafkaSink] (vert.x-eventloop-thread-0) Message io.smallrye.reactive.messaging.kafka.SendingKafkaMessage@12021771 sent successfully to Kafka topic 'prices'
    2019-12-09 14:31:00,297 INFO  [io.sma.rea.mes.kaf.KafkaSink] (RxComputationThreadPool-1) Sending message io.smallrye.reactive.messaging.kafka.SendingKafkaMessage@5e5f389a to Kafka topic 'prices'
    2019-12-09 14:31:00,334 INFO  [io.sma.rea.mes.kaf.KafkaSink] (vert.x-eventloop-thread-0) Message io.smallrye.reactive.messaging.kafka.SendingKafkaMessage@5e5f389a sent successfully to Kafka topic 'prices'
    2019-12-09 14:31:01,301 INFO  [io.sma.rea.mes.kaf.KafkaSink] (RxComputationThreadPool-1) Sending message io.smallrye.reactive.messaging.kafka.SendingKafkaMessage@5a403106 to Kafka topic 'prices'
    2019-12-09 14:31:01,341 INFO  [io.sma.rea.mes.kaf.KafkaSink] (vert.x-eventloop-thread-0) Message io.smallrye.reactive.messaging.kafka.SendingKafkaMessage@5a403106 sent successfully to Kafka topic 'prices'
    2019-12-09 14:31:02,296 INFO  [io.sma.rea.mes.kaf.KafkaSink] (RxComputationThreadPool-1) Sending message io.smallrye.reactive.messaging.kafka.SendingKafkaMessage@3bb2aac0 to Kafka topic 'prices'
    2019-12-09 14:31:02,323 INFO  [io.sma.rea.mes.kaf.KafkaSink] (vert.x-eventloop-thread-0) Message io.smallrye.reactive.messaging.kafka.SendingKafkaMessage@3bb2aac0 sent successfully to Kafka topic 'prices'
    

    The messages you are sending to Kafka are using the Apicurio serializer to validate the record schema using Red Hat Integration service registry. If you want to take a closer look at the code and see how to implement the Incoming pattern for Quarkus, take a took at the full example in my amq-examples GitHub repository.

    To make it easy to transition from Confluent, the service registry also adds compatibility with the Confluent Schema Registry REST API. This means that applications using Confluent client libraries can replace Schema Registry and use Red Hat Integration service registry instead.

    Summary

    The Red Hat Integration service registry is a central data store for schemas and API artifacts. Developers can query, create, read, update, and delete service artifacts, versions, and rules to govern the structure of their services. Developer teams can work with popular formats like Avro or Protobuf schemas as well as OpenAPI and AsyncAPI definitions. The service registry could be also used as a drop-in replacement for Confluent registry with Apache Kafka clients by using the included serializer and deserializer classes.

    See also:

    • Red Hat advances Debezium CDC connectors for Apache Kafka support to Technical Preview
    • Red Hat simplifies transition to open source Kafka with new service registry and HTTP bridge
    Last updated: July 1, 2020

    Recent Posts

    • Speech-to-text with Whisper and Red Hat AI Inference Server

    • How to use Splunk as an event source for Event-Driven Ansible

    • Integrate vLLM inference on macOS/iOS with Llama Stack APIs

    • Optimize model serving at the edge with RawDeployment mode

    • Introducing Red Hat build of Cryostat 4.0

    Red Hat Developers logo LinkedIn YouTube Twitter Facebook

    Products

    • Red Hat Enterprise Linux
    • Red Hat OpenShift
    • Red Hat Ansible Automation Platform

    Build

    • Developer Sandbox
    • Developer Tools
    • Interactive Tutorials
    • API Catalog

    Quicklinks

    • Learning Resources
    • E-books
    • Cheat Sheets
    • Blog
    • Events
    • Newsletter

    Communicate

    • About us
    • Contact sales
    • Find a partner
    • Report a website issue
    • Site Status Dashboard
    • Report a security problem

    RED HAT DEVELOPER

    Build here. Go anywhere.

    We serve the builders. The problem solvers who create careers with code.

    Join us if you’re a developer, software engineer, web designer, front-end designer, UX designer, computer scientist, architect, tester, product manager, project manager or team lead.

    Sign me up

    Red Hat legal and privacy links

    • About Red Hat
    • Jobs
    • Events
    • Locations
    • Contact Red Hat
    • Red Hat Blog
    • Inclusion at Red Hat
    • Cool Stuff Store
    • Red Hat Summit

    Red Hat legal and privacy links

    • Privacy statement
    • Terms of use
    • All policies and guidelines
    • Digital accessibility

    Report a website issue