Kafka streams: Read from ALL partitions in every instance of an application

By : Mauricio Iriarte Val
Date : October 15 2020, 08:10 PM
To fix the issue you can do Either change the number of partitions of your input topic "data_in" to 1 partition or use a GlobalKtable to get data from all partitions in the topic and then you can join your stream with it. With that, your apps instances no longer have to be in different consumer group.
The code will look like this :
code :
private GlobalKTable<String, theDataList> globalStream() {

   // KStream of records from data-in topic using String and theDataSerde deserializers
  KStream<String, Data> trashStream = getBuilder().stream("data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde));

  thrashStream.to("new_data_in"); // by sending to an other topic you're forcing a repartition on that topic

  KStream<String, Data> newTrashStream = getBuilder().stream("new_data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde));

  // Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)
  KGroupedStream<String, Data> KGS = newTrashStream.groupByKey();

  Materialized<String, theDataList, KeyValueStore<Bytes, byte[]>> materialized = Materialized.as("agg-stream-store");
  materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);

// Return a KTable
  KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {
      if (!value.getValideData())
          aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());
      return aggregate;
  }, materialized)

  return getBuilder().globalTable("agg_data_in");

Kafka Streams - kafka-streams-application-reset.sh sending wrong API version

By : Tacti_Steve
Date : March 29 2020, 07:55 AM
this one helps. Kafka Streams is based on version 0.10 and does not work for 0.9.
The same is true for bin/kafka-streams-application-reset tool. It uses a 0.10 KafkaClient internally and requires a 0.10 broker.

Kafka Streams - Is it possible to run remote interactive queries without a local Kafka Streams instance

By : specops872
Date : March 29 2020, 07:55 AM
like below fixes the issue Is it possible to find the host/value pairs for all running instances of an application without also running a local Kafka Streams instance in the same consumer group? (I highlight running because I don't mind instantiating a dummy instance of the Kafka Streams app just to get the host/port meta data, but there is a validateRunning check that prevents me from doing so)
Why don't you add a new REST API method to your (first) Kafka Streams application that exposes the currently active host/port pairs to your second app? The app instances would be of course have this information readily available.

How does Kafka Streams allocate partitions?

By : Confused SQL User
Date : March 29 2020, 07:55 AM
this one helps. KTables are sharded according to the input partitions. Thus, similar to a KStream, each instance will get one topic-partition assigned and materialize this topic-partition as shard of the KTable. Kafka Streams make sure, that topic partitions of different topic are co-located, ie, one instance will get assigned topic-1 partition-0 and topic-2 partition-0 (and so forth).
If topic-2 has no key set, data will be randomly distributed in the topic. For this case, you can use a GlobalKTable instead. A GlobalKTable is a full replication of all partitions per instance. If you do a KStream-GlobalKTable-join, you can specify a "mapper" that extracts the join attribute from the table (ie, you can extract the join attribute from the value).

Kafka Streams: Internal topic partitions

By : user1460025
Date : March 29 2020, 07:55 AM
will be helpful for those in need Kafka Streams will create the topic for you. And yes, it will create it with the same number of partitions as your input topic. During startup, Kafka Streams also checks if the topic has the expected number of partitions and fails if not.
The internal topic is basically a regular topic as any other and you can change the number of partitions via command line tools like for any other topic. However, this should never be required. Also note, that dropping/adding partitions, will mess up your state.

Can we lose messages in Kafka Streams if we add new partitions?

By : Alphanumerics
Date : March 29 2020, 07:55 AM
should help you out You won't loose data, however, depending on your application, adding partitions might not supported and will break your application.
You can add partitions only, if you application is stateless. If your application is stateful, your application will most likely break and die with an exception.
