logo
Tags down

shadow

Deserialize encrypted kafka message in consumer


By : bogdan
Date : October 18 2020, 08:10 PM
I think the issue was by ths following ,
If I set value.deserializer in consumer config as "KafkaAvroDeserializer.class", I don't see any messages being consumed.
code :
// consumerConfig.put("value.deserializer", ByteBufferDeserializer.class)

ByteBuffer buf = record.value();
Deserializer d;

if (buf == null) {
    System.err.println("Tombstoned record");
} else if (buf.get() == 0x0) { // Check for Avro
    int schemaId = buf.getInt();  // If you wanted it

    d = new KafkaAvroDeserializer();        
    Map<String, String> config = new HashMap<>();
    config.put("schema.registry.url", "http://..."); // address to registry
    boolean isKey = false;
    d.configure(config, isKey);

    AvroValue v = d.deserialize(value);
    // TODO: Handle record
} else {
    try {
        d = new StringDeserializer();
        String s = d.deserialize(value);
        // TODO: Handle record
    } catch (Exception e) {
        e.printStackTrace();
    }

}


Share : facebook icon twitter icon

Issue in deserialize protostuff object in Kafka Consumer


By : Victor Leonardo Cord
Date : March 29 2020, 07:55 AM
like below fixes the issue I found the solution. Issue is there in toBytes and fromBytes. Need to convert it into byte[] using ProtostuffIOUtil.toByteArray method.
Serialization
code :
public static byte[] toBytes(Foo o)
{
  LinkedBuffer BUFFER = LinkedBuffer.allocate(1024*1024);
  Schema<Foo> SCHEMA = Foo.getSchema();
  return ProtostuffIOUtil.toByteArray(o, SCHEMA, BUFFER);
}
public static Foo fromBytes(byte[] bytes)
{
  Foo tmp = Foo.getSchema().newMessage();
  ProtostuffIOUtil.mergeFrom(bytes, tmp, Foo.getSchema());
  return tmp;
}

Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-time


By : Dilip Das
Date : March 29 2020, 07:55 AM
hope this fix your issue I was able to solve the problem myself.
The library "com.typesafe.akka" %% "akka-stream-kafka" only works for Kafka 0.10 and beyond. it does not work for earlier versions of Kafka. When I listed the kafka jars on my kafka server I found that I am using Cloudera 5.7.1 which comes with Kafka 0.9.
code :
"com.softwaremill.reactivekafka" % "reactive-kafka-core_2.11" % "0.10.0"
implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()
val kafka = new ReactiveKafka()
val consumerProperties = ConsumerProperties(
  bootstrapServers = "foo:9092",
  topic = "my-topic",
  groupId = "abhi",
  valueDeserializer = new StringDeserializer()
)

val source = Source.fromPublisher(kafka.consume(consumerProperties))
val flow = Flow[ConsumerRecord[Array[Byte], String]].map(r => r.value())
val sink = Sink.foreach[String](println)
val graph = RunnableGraph.fromGraph(GraphDSL.create(sink) {implicit builder =>
 s =>
   import GraphDSL.Implicits._
    source ~> flow ~> s.in
   ClosedShape
})
val future = graph.run()
future.onComplete{_ =>
  actorSystem.terminate()
}
Await.result(actorSystem.whenTerminated, Duration.Inf)

Python kafka: Is there a way to block a consumer on a kafka topic till a new message is posted?


By : Almantas
Date : March 29 2020, 07:55 AM
seems to work fine Polling in an infinite loop is what is suggested in Kafka: The Definitive Guide as well. Here is a Java excerpt from Chapter 4. Kafka Consumers: Reading Data from Kafka using the same idea:
code :
try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        ...
    }
}
from kafka import KafkaConsumer
...
kafka_consumer = Consumer(
...
)
consumer.subscribe([topic])

running = True
while running:
    message = kafka_consumer.poll()
...
from confluent_kafka import Consumer, KafkaError
...
c = Consumer(settings)

c.subscribe(['mytopic'])

try:
    while True:
        msg = c.poll(0.1)
...

Kafka Consumer is not able to deserialize timewindowed key which has start and end time


By : user2748647
Date : March 29 2020, 07:55 AM
fixed the issue. Will look into that further Seems you are hitting https://issues.apache.org/jira/browse/KAFKA-7110
It's fixed in 2.2.0, that allows you pass the window size into the constructor or TimeWindows.

Encrypted traffic from Kafka to .NET Consumer


By : user3680232
Date : March 29 2020, 07:55 AM
Hope that helps TLS should work fine. Otherwise you'd write your own Serializer / Deserializer classes to do encryption
Related Posts Related Posts :
  • Java derby embedded DB error: The syntax of the string representation of a date/time value is incorrect
  • WeakReference of a Collection in java
  • Getting an average of doubles from ArrayList
  • Detecting circular references in Directed acyclic graph
  • Unexpected behavior sending object through socket in java
  • null object reference on data binding
  • Why am I getting an ArrayIndexOutOfBoundsException in this particular code?
  • Program using I/O & Strings. | Basic Inventory Markup Calculator
  • Kafka Avro Serializer and deserializer exception. Avro supported types
  • How to get links from HTML, correct usage of `doc.select`
  • Replace this lambda with a method reference
  • JsonPath ignore the Debug logs on output
  • Spring boot parsing @RequestBody
  • On KafkaProducer.send(message), I am getting "exception Error serializing Avro message"
  • How to perform throttling based on user defined argument?
  • ReadProcessMemory across module boundaries
  • How do I create varied colors for buttons in Java GUI?
  • Jackson LocalDate: one day off during serialization
  • Eclipse Milo: writing to an object member variable
  • JHipster/React - Get data from server anonymously (before login)
  • Oauth 2.0 - Single resource server but multiple client applications
  • How to transfer an int via DatagramSocket
  • How to make a layout for the equalizer?
  • JOOQ update set not populating values
  • Static variable being fed into JTextfield is only JTextfield that I am failing to update. Can't figure out why but I thi
  • Why exclude commons-logging when using slf4j?
  • How to fix error "contractor can't be apply upload to given types"?
  • Eclipse 2018-09 with jdk11 claims that package org.junit is missing even though it is included by maven
  • java linked list iterator error, output is displayed wrong
  • How to replace forward slash with triple forward slash in Java?
  • adding list to another Arraylist clears the output
  • Java validation add constraint for only set of string values
  • Java | Shift out of bounds index until it is in bounds
  • Two versions of Talend giving different results
  • Spring CGLIB proxies intercept only public method calls
  • Could not find any matches for com.android.tools.build:gradle:2.3.+
  • ClassCastException in JavaFX
  • Create TestNG xml in Intellij
  • Very weird Genson's behaviour while reading from socket stream
  • How can I access VO in one application module from a VO in another application module?
  • How do I know my vertex has a connection?
  • What is the difference between compare() and compareUnsigned() in Java
  • @JsonFormat converts Date with incorrect timezone
  • Kafka streams: Read from ALL partitions in every instance of an application
  • Why does a subclass have access to a private member of the superclass with inner classes?
  • How to update ListView cells dynamically in JavaFX?
  • Need some help debugging this java
  • How to filter directory listing by using a property from the resultant list itself in Java?
  • Convert RxJava Single to Mono
  • running jetty web service via reflection
  • How to edit a Spring Pageable object?
  • validate json with entity class
  • Fetching Data Parallel ly from 2 BLE connected Devices
  • keyPressed doesn't work unless JButton clicked first
  • Java NIO Search file in a directory
  • How can I get the id of element in array of imageView Android
  • How to remove an array element from an Arraylist without using index in Java
  • ClassCastException when trying to cast a subclass
  • fetch row id from oracle using Java
  • date is back flowed in java application
  • shadow
    Privacy Policy - Terms - Contact Us © voile276.org