Deserialize encrypted kafka message in consumer

By : bogdan
Date : October 18 2020, 08:10 PM
I think the issue was by ths following ,
If I set value.deserializer in consumer config as "KafkaAvroDeserializer.class", I don't see any messages being consumed.
code :
// consumerConfig.put("value.deserializer", ByteBufferDeserializer.class)

ByteBuffer buf = record.value();
Deserializer d;

if (buf == null) {
    System.err.println("Tombstoned record");
} else if (buf.get() == 0x0) { // Check for Avro
    int schemaId = buf.getInt();  // If you wanted it

    d = new KafkaAvroDeserializer();        
    Map<String, String> config = new HashMap<>();
    config.put("schema.registry.url", "http://..."); // address to registry
    boolean isKey = false;
    d.configure(config, isKey);

    AvroValue v = d.deserialize(value);
    // TODO: Handle record
} else {
    try {
        d = new StringDeserializer();
        String s = d.deserialize(value);
        // TODO: Handle record
    } catch (Exception e) {


Issue in deserialize protostuff object in Kafka Consumer

By : Victor Leonardo Cord
Date : March 29 2020, 07:55 AM
like below fixes the issue I found the solution. Issue is there in toBytes and fromBytes. Need to convert it into byte[] using ProtostuffIOUtil.toByteArray method.
code :
public static byte[] toBytes(Foo o)
  LinkedBuffer BUFFER = LinkedBuffer.allocate(1024*1024);
  Schema<Foo> SCHEMA = Foo.getSchema();
  return ProtostuffIOUtil.toByteArray(o, SCHEMA, BUFFER);
public static Foo fromBytes(byte[] bytes)
  Foo tmp = Foo.getSchema().newMessage();
  ProtostuffIOUtil.mergeFrom(bytes, tmp, Foo.getSchema());
  return tmp;

Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-time

By : Dilip Das
Date : March 29 2020, 07:55 AM
hope this fix your issue I was able to solve the problem myself.
The library "com.typesafe.akka" %% "akka-stream-kafka" only works for Kafka 0.10 and beyond. it does not work for earlier versions of Kafka. When I listed the kafka jars on my kafka server I found that I am using Cloudera 5.7.1 which comes with Kafka 0.9.
code :
"com.softwaremill.reactivekafka" % "reactive-kafka-core_2.11" % "0.10.0"
implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()
val kafka = new ReactiveKafka()
val consumerProperties = ConsumerProperties(
  bootstrapServers = "foo:9092",
  topic = "my-topic",
  groupId = "abhi",
  valueDeserializer = new StringDeserializer()

val source = Source.fromPublisher(kafka.consume(consumerProperties))
val flow = Flow[ConsumerRecord[Array[Byte], String]].map(r => r.value())
val sink = Sink.foreach[String](println)
val graph = RunnableGraph.fromGraph(GraphDSL.create(sink) {implicit builder =>
 s =>
   import GraphDSL.Implicits._
    source ~> flow ~> s.in
val future = graph.run()
future.onComplete{_ =>
Await.result(actorSystem.whenTerminated, Duration.Inf)

Python kafka: Is there a way to block a consumer on a kafka topic till a new message is posted?

By : Almantas
Date : March 29 2020, 07:55 AM
seems to work fine Polling in an infinite loop is what is suggested in Kafka: The Definitive Guide as well. Here is a Java excerpt from Chapter 4. Kafka Consumers: Reading Data from Kafka using the same idea:
code :
try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
from kafka import KafkaConsumer
kafka_consumer = Consumer(

running = True
while running:
    message = kafka_consumer.poll()
from confluent_kafka import Consumer, KafkaError
c = Consumer(settings)


    while True:
        msg = c.poll(0.1)

Kafka Consumer is not able to deserialize timewindowed key which has start and end time

By : user2748647
Date : March 29 2020, 07:55 AM
fixed the issue. Will look into that further Seems you are hitting https://issues.apache.org/jira/browse/KAFKA-7110
It's fixed in 2.2.0, that allows you pass the window size into the constructor or TimeWindows.

Encrypted traffic from Kafka to .NET Consumer

By : user3680232
Date : March 29 2020, 07:55 AM
Hope that helps TLS should work fine. Otherwise you'd write your own Serializer / Deserializer classes to do encryption
