Tags down


Kafka Avro Serializer and deserializer exception. Avro supported types

By : user2174232
Date : October 18 2020, 08:10 PM
Hope this helps TweetInfoDto cannot be a plain Java object that you have defined yourself.
It ideally should be created from an Avro schema via the Avro Maven Plugin, for example.
code :

Share : facebook icon twitter icon

Kafka Avro Serializer: org.apache.avro.AvroRuntimeException: not open

By : Filink Cao
Date : March 29 2020, 07:55 AM
this will help Kafka receives a key value pair to serialize, you're passing it a DataFileWriter which isn't the value you want to serialize, that's not going to work.
What you need to do is create a byte array with the serialized avro via a BinaryEncoder and ByteArrayOutputStream, and then pass it to the ProducerRecord:
code :
SpecificDatumWriter<Customer> writer = new SpecificDatumWriter<>(schema);
ByteArrayOutputStream os = new ByteArrayOutputStream();

try {
  BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(os, null);
  writer.write(customer1, encoder);

  byte[] avroBytes = os.toByteArray();
  ProducerRecord<String, byte[]> record1 = 
    new ProducerRecord<>("CustomerCountry", "Customer One", avroBytes); 

} finally {

Kafka Consumer for Spark written in Scala for Kafka API 0.10: custom AVRO deserializer

By : Bichitra K Satapathy
Date : March 29 2020, 07:55 AM
it fixes the issue It is possible. You need to override the Deserializer interface defined in org.apache.kafka.common.serialization and you need to point key.deserializer or value.deserializer to your custom class via the ConsumerStrategy[K, V] class which holds the Kafka parameters. For example:
code :
import org.apache.kafka.common.serialization.Deserializer

class AvroDeserializer extends Deserializer[Array[Byte]] {
  override def configure(map: util.Map[String, _], b: Boolean): Unit = ???
  override def close(): Unit = ???
  override def deserialize(s: String, bytes: Array[Byte]): Array[Byte] = ???
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import my.location.with.AvroDeserializer

val ssc: StreamingContext = ???
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[AvroDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)

val topics = Array("sometopic")
val stream = KafkaUtils.createDirectStream[String, MyTypeWithAvroDeserializer](
  Subscribe[String, String](topics, kafkaParams)

How to write Junit Test Case for KafkaStreams with Avro Deserializer and Avro Serializer

By : Purbasha Das
Date : March 29 2020, 07:55 AM
around this issue I start an Avro schema-registry with each test with a distinct port. Like this: @Test public void testKafkaWithAvro() { try { Process process = Runtime.getRuntime().exec("schema-registry-start ./schema-registry.properties");
code :
        final String TEST_URL = "http://localhost:8181";
        AvroValue avroValue = new AvroValue();

        // Start creating and configuring the stream processing
        StreamsBuilder builder = new StreamsBuilder();

        myConfigFactory myConfigFactory = new myConfigFactory();

        myConfig myConfig = myConfigFactory.build();

        // call the code to be tested
        myBuilder myBuilder = new myBuilder(myConfig);

        Properties props = new Properties();
        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-unit-test");
        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
        props.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, TEST_URL);
        this.config = new StreamsConfig(props);

        // Create the topology to start testing
        Topology topology = builder.build();
        ProcessorTopologyTestDriver driver =
                new ProcessorTopologyTestDriver(this.config, topology);

        final Map<String, String> serdeConfig = new HashMap<>();
        serdeConfig.put("schema.registry.url", TEST_URL);
        serdeConfig.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, "true");

        Serde<AvroKey> keyAvroSerde = new SpecificAvroSerde<>();
        keyAvroSerde.configure(serdeConfig, true); // `true` for record keys
        final Serde<AvroValue> valueAvroSerde = new SpecificAvroSerde<>();
        valueAvroSerde.configure(serdeConfig, false); // `false` for record values

        // Run a test with something that should pass the filter
        driver.process(myConfigFactory.getFirstTopic(), testKey,
                testValue, keyAvroSerde.serializer(),

        ProducerRecord<myKey, dbo_my> recordPassesFilter =

        ProducerRecord<OutAvroKey, OutAvroValue> recordOut =
                new ProducerRecord<>(myConfigFactory.getOutTopic(), null, 0L,
                        outKey, outValue);

        assertEquals("The filtered output keys didn't match", recordOut.key(),
        assertEquals("The filtered output values didn't match", recordOut.value(),
    } catch (Exception e) {

kafka Avro message deserializer for multiple topics

By : Chris Mueller
Date : March 29 2020, 07:55 AM
should help you out I am trying to desirialize kafka message in avro format I am using following code: https://github.com/ivangfr/springboot-kafka-debezium-ksql/blob/master/kafka-research-consumer/src/main/java/com/mycompany/kafkaresearchconsumer/kafka/ReviewsConsumerConfig.java , Use below line in your configuration:
code :
props.put( KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());
props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName() );
props.put( SpecificAvroWithSchemaDeserializer.AVRO_VALUE_RECORD_TYPE, mysql.researchdb.institutes.Value.class );
props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroWithSchemaDeserializer.class );
package com.moglix.netsuite.kafka;

import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;

import java.util.HashMap;
import java.util.Map;

public class ReviewsConsumerConfig

@Value( "${kafka.bootstrap-servers}" )
private String bootstrapServers;

@Value( "${kafka.schema-registry-url}" )
private String schemaRegistryUrl;

@Value( "${kafka.reviews.start-offset}" )
private String orderStartOffset;

@Value( "${kafka.reviews.max-poll-records}" )
private Integer maxPollRecords;

public <T> ConcurrentKafkaListenerContainerFactory<String, T> kafkaListenerContainerFactory()
    ConcurrentKafkaListenerContainerFactory<String, T> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory( consumerFactory1() );
    factory.setBatchListener( true );
    factory.getContainerProperties().setAckMode( ContainerProperties.AckMode.MANUAL );
    factory.getContainerProperties().setSyncCommits( true );
    return factory;

public <T> ConsumerFactory<String, T> consumerFactory1()
    return new DefaultKafkaConsumerFactory<>( consumerConfigs1() );

public Map<String, Object> consumerConfigs1()
    Map<String, Object> props = new HashMap<>();
    props.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers );
    props.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class );
    props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName() );
    props.put( ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, orderStartOffset );

    props.put( AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl );
    props.put( KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true );
    props.put( KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());// This is main line for my problem solution
    //props.put( SpecificAvroWithSchemaDeserializer.AVRO_VALUE_RECORD_TYPE, mysql.researchdb.institutes.Value.class );

    props.put( ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords );
    props.put( ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false );
    return props;


Confluent Kafka Avro deserializer for spring boot kafka listener

By : Debby
Date : March 29 2020, 07:55 AM
I wish this helpful for you Here is my answer, which I've implemented based on: "io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer"
Related Posts Related Posts :
  • Java derby embedded DB error: The syntax of the string representation of a date/time value is incorrect
  • WeakReference of a Collection in java
  • Getting an average of doubles from ArrayList
  • Detecting circular references in Directed acyclic graph
  • Unexpected behavior sending object through socket in java
  • null object reference on data binding
  • Why am I getting an ArrayIndexOutOfBoundsException in this particular code?
  • Deserialize encrypted kafka message in consumer
  • Program using I/O & Strings. | Basic Inventory Markup Calculator
  • How to get links from HTML, correct usage of `doc.select`
  • Replace this lambda with a method reference
  • JsonPath ignore the Debug logs on output
  • Spring boot parsing @RequestBody
  • On KafkaProducer.send(message), I am getting "exception Error serializing Avro message"
  • How to perform throttling based on user defined argument?
  • ReadProcessMemory across module boundaries
  • How do I create varied colors for buttons in Java GUI?
  • Jackson LocalDate: one day off during serialization
  • Eclipse Milo: writing to an object member variable
  • JHipster/React - Get data from server anonymously (before login)
  • Oauth 2.0 - Single resource server but multiple client applications
  • How to transfer an int via DatagramSocket
  • How to make a layout for the equalizer?
  • JOOQ update set not populating values
  • Static variable being fed into JTextfield is only JTextfield that I am failing to update. Can't figure out why but I thi
  • Why exclude commons-logging when using slf4j?
  • How to fix error "contractor can't be apply upload to given types"?
  • Eclipse 2018-09 with jdk11 claims that package org.junit is missing even though it is included by maven
  • java linked list iterator error, output is displayed wrong
  • How to replace forward slash with triple forward slash in Java?
  • adding list to another Arraylist clears the output
  • Java validation add constraint for only set of string values
  • Java | Shift out of bounds index until it is in bounds
  • Two versions of Talend giving different results
  • Spring CGLIB proxies intercept only public method calls
  • Could not find any matches for com.android.tools.build:gradle:2.3.+
  • ClassCastException in JavaFX
  • Create TestNG xml in Intellij
  • Very weird Genson's behaviour while reading from socket stream
  • How can I access VO in one application module from a VO in another application module?
  • How do I know my vertex has a connection?
  • What is the difference between compare() and compareUnsigned() in Java
  • @JsonFormat converts Date with incorrect timezone
  • Kafka streams: Read from ALL partitions in every instance of an application
  • Why does a subclass have access to a private member of the superclass with inner classes?
  • How to update ListView cells dynamically in JavaFX?
  • Need some help debugging this java
  • How to filter directory listing by using a property from the resultant list itself in Java?
  • Convert RxJava Single to Mono
  • running jetty web service via reflection
  • How to edit a Spring Pageable object?
  • validate json with entity class
  • Fetching Data Parallel ly from 2 BLE connected Devices
  • keyPressed doesn't work unless JButton clicked first
  • Java NIO Search file in a directory
  • How can I get the id of element in array of imageView Android
  • How to remove an array element from an Arraylist without using index in Java
  • ClassCastException when trying to cast a subclass
  • fetch row id from oracle using Java
  • date is back flowed in java application
  • shadow
    Privacy Policy - Terms - Contact Us © voile276.org