Kafka Streams to topic












1














I need to calculate an average with a kafka streams. The producer produce with Avro and so i need to deserialize with it and i receive a GenericRecord with a json String that i have to elaborate.



I use a user defined type as support.



 private class Tuple {

public int occ;
public int sum;


public Tuple (int occ, int sum) {
this.occ = occ;
this.sum = sum;
}

public void sum (int toAdd) {
this.sum += toAdd;
this.occ ++;
}

public Double getAverage () {
return new Double (this.sum / this.occ);
}

public String toString() {
return "occorrenze: " + this.occ + ", somma: " + sum + ", media -> " + getAverage();
}

}


Now the elaboration:



 StreamsBuilder builder = new StreamsBuilder();
KStream<GenericRecord, GenericRecord> source =
builder.stream(topic);

KStream<GenericRecord, GenericRecord> branches = source.branch(
(key,value) -> partition(value.toString()),
(key, value) -> true
);

KGroupedStream <String, String> groupedStream = branches[0]
.mapValues(value -> createJson(value.toString()))
.map((key, value) -> KeyValue.pair(new String("T_DUR_CICLO"), value.getNumberValue("payload", "T_DUR_CICLO")))
.groupByKey( Serialized.with(
Serdes.String(), /* key (note: type was modified) */
Serdes.String())); /* value */

branches[0].foreach((key, value) -> System.out.println(key + " " + value));

KTable<String, Tuple> aggregatedStream = groupedStream.aggregate(
() -> new Tuple(0, 0), // initializer
(aggKey, newValue, aggValue) -> new Tuple (aggValue.occ + 1, aggValue.sum + Integer.parseInt(newValue)),
Materialized.as("aggregate-state-store").with(Serdes.String(), new MySerde()));

aggregatedStream
.toStream()
.foreach((key, value) -> System.out.println(key + ": " + value));

KStream<String, Double> average = aggregatedStream
.mapValues(v -> v.getAverage())
.toStream();


The problem is when i go to store the stream in a topic with:



 average.to("average");


Here the exception:



  Exception in thread "streamtest-6d743b83-ce22-435e-aee5-76a745ce3571-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000007
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:202)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:420)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:394)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:382)
at org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:362)
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:352)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:401)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1042)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:845)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer / value: io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: java.lang.Double). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:106)
at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:83)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:125)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:123)
at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:284)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:149)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:239)
... 12 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord
at io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer.serialize(GenericAvroSerializer.java:39)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:156)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:101)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
... 41 more


----- EDIT ------
i add tha class for serialize and deserialize



serializer:



  private class TupleSerializer implements Serializer<Tuple> {

@Override
public void configure(Map<String, ?> map, boolean bln) {
}

@Override
public byte serialize(String string, Tuple t) {
ByteBuffer buffer = ByteBuffer.allocate(4 + 4);
buffer.putInt(t.occ);
buffer.putInt(t.sum);
return buffer.array();
}

@Override
public void close() {
}

}


deserializer:



 private class TupleDeserializer implements Deserializer<Tuple> {

@Override
public void configure(Map<String, ?> map, boolean bln) {
}

@Override
public void close() {
}

@Override
public Tuple deserialize(String string, byte bytes) {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
int occ = buffer.getInt();
int sum = buffer.getInt();
return new Tuple (occ, sum);
}

}


MySerde:



private class MySerde implements Serde<Tuple> {

@Override
public void configure(Map<String, ?> map, boolean bln) {
}

@Override
public void close() {
}

@Override
public Serializer<Tuple> serializer() {
return new TupleSerializer ();
}

@Override
public Deserializer<Tuple> deserializer() {
return new TupleDeserializer ();
}

}









share|improve this question
























  • I don't think you need to convert to JSON here... That is just going to slow the process down
    – cricket_007
    Nov 20 at 23:18










  • It might help if you add MySerde, by the way
    – cricket_007
    Nov 20 at 23:20
















1














I need to calculate an average with a kafka streams. The producer produce with Avro and so i need to deserialize with it and i receive a GenericRecord with a json String that i have to elaborate.



I use a user defined type as support.



 private class Tuple {

public int occ;
public int sum;


public Tuple (int occ, int sum) {
this.occ = occ;
this.sum = sum;
}

public void sum (int toAdd) {
this.sum += toAdd;
this.occ ++;
}

public Double getAverage () {
return new Double (this.sum / this.occ);
}

public String toString() {
return "occorrenze: " + this.occ + ", somma: " + sum + ", media -> " + getAverage();
}

}


Now the elaboration:



 StreamsBuilder builder = new StreamsBuilder();
KStream<GenericRecord, GenericRecord> source =
builder.stream(topic);

KStream<GenericRecord, GenericRecord> branches = source.branch(
(key,value) -> partition(value.toString()),
(key, value) -> true
);

KGroupedStream <String, String> groupedStream = branches[0]
.mapValues(value -> createJson(value.toString()))
.map((key, value) -> KeyValue.pair(new String("T_DUR_CICLO"), value.getNumberValue("payload", "T_DUR_CICLO")))
.groupByKey( Serialized.with(
Serdes.String(), /* key (note: type was modified) */
Serdes.String())); /* value */

branches[0].foreach((key, value) -> System.out.println(key + " " + value));

KTable<String, Tuple> aggregatedStream = groupedStream.aggregate(
() -> new Tuple(0, 0), // initializer
(aggKey, newValue, aggValue) -> new Tuple (aggValue.occ + 1, aggValue.sum + Integer.parseInt(newValue)),
Materialized.as("aggregate-state-store").with(Serdes.String(), new MySerde()));

aggregatedStream
.toStream()
.foreach((key, value) -> System.out.println(key + ": " + value));

KStream<String, Double> average = aggregatedStream
.mapValues(v -> v.getAverage())
.toStream();


The problem is when i go to store the stream in a topic with:



 average.to("average");


Here the exception:



  Exception in thread "streamtest-6d743b83-ce22-435e-aee5-76a745ce3571-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000007
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:202)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:420)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:394)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:382)
at org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:362)
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:352)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:401)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1042)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:845)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer / value: io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: java.lang.Double). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:106)
at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:83)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:125)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:123)
at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:284)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:149)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:239)
... 12 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord
at io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer.serialize(GenericAvroSerializer.java:39)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:156)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:101)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
... 41 more


----- EDIT ------
i add tha class for serialize and deserialize



serializer:



  private class TupleSerializer implements Serializer<Tuple> {

@Override
public void configure(Map<String, ?> map, boolean bln) {
}

@Override
public byte serialize(String string, Tuple t) {
ByteBuffer buffer = ByteBuffer.allocate(4 + 4);
buffer.putInt(t.occ);
buffer.putInt(t.sum);
return buffer.array();
}

@Override
public void close() {
}

}


deserializer:



 private class TupleDeserializer implements Deserializer<Tuple> {

@Override
public void configure(Map<String, ?> map, boolean bln) {
}

@Override
public void close() {
}

@Override
public Tuple deserialize(String string, byte bytes) {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
int occ = buffer.getInt();
int sum = buffer.getInt();
return new Tuple (occ, sum);
}

}


MySerde:



private class MySerde implements Serde<Tuple> {

@Override
public void configure(Map<String, ?> map, boolean bln) {
}

@Override
public void close() {
}

@Override
public Serializer<Tuple> serializer() {
return new TupleSerializer ();
}

@Override
public Deserializer<Tuple> deserializer() {
return new TupleDeserializer ();
}

}









share|improve this question
























  • I don't think you need to convert to JSON here... That is just going to slow the process down
    – cricket_007
    Nov 20 at 23:18










  • It might help if you add MySerde, by the way
    – cricket_007
    Nov 20 at 23:20














1












1








1







I need to calculate an average with a kafka streams. The producer produce with Avro and so i need to deserialize with it and i receive a GenericRecord with a json String that i have to elaborate.



I use a user defined type as support.



 private class Tuple {

public int occ;
public int sum;


public Tuple (int occ, int sum) {
this.occ = occ;
this.sum = sum;
}

public void sum (int toAdd) {
this.sum += toAdd;
this.occ ++;
}

public Double getAverage () {
return new Double (this.sum / this.occ);
}

public String toString() {
return "occorrenze: " + this.occ + ", somma: " + sum + ", media -> " + getAverage();
}

}


Now the elaboration:



 StreamsBuilder builder = new StreamsBuilder();
KStream<GenericRecord, GenericRecord> source =
builder.stream(topic);

KStream<GenericRecord, GenericRecord> branches = source.branch(
(key,value) -> partition(value.toString()),
(key, value) -> true
);

KGroupedStream <String, String> groupedStream = branches[0]
.mapValues(value -> createJson(value.toString()))
.map((key, value) -> KeyValue.pair(new String("T_DUR_CICLO"), value.getNumberValue("payload", "T_DUR_CICLO")))
.groupByKey( Serialized.with(
Serdes.String(), /* key (note: type was modified) */
Serdes.String())); /* value */

branches[0].foreach((key, value) -> System.out.println(key + " " + value));

KTable<String, Tuple> aggregatedStream = groupedStream.aggregate(
() -> new Tuple(0, 0), // initializer
(aggKey, newValue, aggValue) -> new Tuple (aggValue.occ + 1, aggValue.sum + Integer.parseInt(newValue)),
Materialized.as("aggregate-state-store").with(Serdes.String(), new MySerde()));

aggregatedStream
.toStream()
.foreach((key, value) -> System.out.println(key + ": " + value));

KStream<String, Double> average = aggregatedStream
.mapValues(v -> v.getAverage())
.toStream();


The problem is when i go to store the stream in a topic with:



 average.to("average");


Here the exception:



  Exception in thread "streamtest-6d743b83-ce22-435e-aee5-76a745ce3571-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000007
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:202)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:420)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:394)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:382)
at org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:362)
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:352)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:401)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1042)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:845)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer / value: io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: java.lang.Double). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:106)
at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:83)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:125)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:123)
at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:284)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:149)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:239)
... 12 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord
at io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer.serialize(GenericAvroSerializer.java:39)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:156)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:101)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
... 41 more


----- EDIT ------
i add tha class for serialize and deserialize



serializer:



  private class TupleSerializer implements Serializer<Tuple> {

@Override
public void configure(Map<String, ?> map, boolean bln) {
}

@Override
public byte serialize(String string, Tuple t) {
ByteBuffer buffer = ByteBuffer.allocate(4 + 4);
buffer.putInt(t.occ);
buffer.putInt(t.sum);
return buffer.array();
}

@Override
public void close() {
}

}


deserializer:



 private class TupleDeserializer implements Deserializer<Tuple> {

@Override
public void configure(Map<String, ?> map, boolean bln) {
}

@Override
public void close() {
}

@Override
public Tuple deserialize(String string, byte bytes) {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
int occ = buffer.getInt();
int sum = buffer.getInt();
return new Tuple (occ, sum);
}

}


MySerde:



private class MySerde implements Serde<Tuple> {

@Override
public void configure(Map<String, ?> map, boolean bln) {
}

@Override
public void close() {
}

@Override
public Serializer<Tuple> serializer() {
return new TupleSerializer ();
}

@Override
public Deserializer<Tuple> deserializer() {
return new TupleDeserializer ();
}

}









share|improve this question















I need to calculate an average with a kafka streams. The producer produce with Avro and so i need to deserialize with it and i receive a GenericRecord with a json String that i have to elaborate.



I use a user defined type as support.



 private class Tuple {

public int occ;
public int sum;


public Tuple (int occ, int sum) {
this.occ = occ;
this.sum = sum;
}

public void sum (int toAdd) {
this.sum += toAdd;
this.occ ++;
}

public Double getAverage () {
return new Double (this.sum / this.occ);
}

public String toString() {
return "occorrenze: " + this.occ + ", somma: " + sum + ", media -> " + getAverage();
}

}


Now the elaboration:



 StreamsBuilder builder = new StreamsBuilder();
KStream<GenericRecord, GenericRecord> source =
builder.stream(topic);

KStream<GenericRecord, GenericRecord> branches = source.branch(
(key,value) -> partition(value.toString()),
(key, value) -> true
);

KGroupedStream <String, String> groupedStream = branches[0]
.mapValues(value -> createJson(value.toString()))
.map((key, value) -> KeyValue.pair(new String("T_DUR_CICLO"), value.getNumberValue("payload", "T_DUR_CICLO")))
.groupByKey( Serialized.with(
Serdes.String(), /* key (note: type was modified) */
Serdes.String())); /* value */

branches[0].foreach((key, value) -> System.out.println(key + " " + value));

KTable<String, Tuple> aggregatedStream = groupedStream.aggregate(
() -> new Tuple(0, 0), // initializer
(aggKey, newValue, aggValue) -> new Tuple (aggValue.occ + 1, aggValue.sum + Integer.parseInt(newValue)),
Materialized.as("aggregate-state-store").with(Serdes.String(), new MySerde()));

aggregatedStream
.toStream()
.foreach((key, value) -> System.out.println(key + ": " + value));

KStream<String, Double> average = aggregatedStream
.mapValues(v -> v.getAverage())
.toStream();


The problem is when i go to store the stream in a topic with:



 average.to("average");


Here the exception:



  Exception in thread "streamtest-6d743b83-ce22-435e-aee5-76a745ce3571-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000007
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242)
at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:202)
at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:420)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:394)
at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:382)
at org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:362)
at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:352)
at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:401)
at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1042)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:845)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
Caused by: org.apache.kafka.streams.errors.StreamsException: A serializer (key: io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer / value: io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: java.lang.Double). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:94)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:126)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:106)
at org.apache.kafka.streams.kstream.internals.KTableMapValues$KTableMapValuesProcessor.process(KTableMapValues.java:83)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:50)
at org.apache.kafka.streams.processor.internals.ProcessorNode.runAndMeasureLatency(ProcessorNode.java:244)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:133)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:143)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:129)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:90)
at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:101)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:125)
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.flush(CachingKeyValueStore.java:123)
at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.flush(InnerMeteredKeyValueStore.java:284)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.flush(MeteredKeyValueBytesStore.java:149)
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:239)
... 12 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.avro.generic.GenericRecord
at io.confluent.kafka.streams.serdes.avro.GenericAvroSerializer.serialize(GenericAvroSerializer.java:39)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:156)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:101)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
... 41 more


----- EDIT ------
i add tha class for serialize and deserialize



serializer:



  private class TupleSerializer implements Serializer<Tuple> {

@Override
public void configure(Map<String, ?> map, boolean bln) {
}

@Override
public byte serialize(String string, Tuple t) {
ByteBuffer buffer = ByteBuffer.allocate(4 + 4);
buffer.putInt(t.occ);
buffer.putInt(t.sum);
return buffer.array();
}

@Override
public void close() {
}

}


deserializer:



 private class TupleDeserializer implements Deserializer<Tuple> {

@Override
public void configure(Map<String, ?> map, boolean bln) {
}

@Override
public void close() {
}

@Override
public Tuple deserialize(String string, byte bytes) {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
int occ = buffer.getInt();
int sum = buffer.getInt();
return new Tuple (occ, sum);
}

}


MySerde:



private class MySerde implements Serde<Tuple> {

@Override
public void configure(Map<String, ?> map, boolean bln) {
}

@Override
public void close() {
}

@Override
public Serializer<Tuple> serializer() {
return new TupleSerializer ();
}

@Override
public Deserializer<Tuple> deserializer() {
return new TupleDeserializer ();
}

}






java apache-kafka avro apache-kafka-streams






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 20 at 23:23

























asked Nov 20 at 23:15









Andrea

205




205












  • I don't think you need to convert to JSON here... That is just going to slow the process down
    – cricket_007
    Nov 20 at 23:18










  • It might help if you add MySerde, by the way
    – cricket_007
    Nov 20 at 23:20


















  • I don't think you need to convert to JSON here... That is just going to slow the process down
    – cricket_007
    Nov 20 at 23:18










  • It might help if you add MySerde, by the way
    – cricket_007
    Nov 20 at 23:20
















I don't think you need to convert to JSON here... That is just going to slow the process down
– cricket_007
Nov 20 at 23:18




I don't think you need to convert to JSON here... That is just going to slow the process down
– cricket_007
Nov 20 at 23:18












It might help if you add MySerde, by the way
– cricket_007
Nov 20 at 23:20




It might help if you add MySerde, by the way
– cricket_007
Nov 20 at 23:20












1 Answer
1






active

oldest

votes


















3














You have to define the Serdes with .to() method to override the default serde type.



average.to("average",Produced.with(Serdes.String(),Serdes.Double());



Please refer more details here :



https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#writing-streams-back-to-kafka






share|improve this answer





















    Your Answer






    StackExchange.ifUsing("editor", function () {
    StackExchange.using("externalEditor", function () {
    StackExchange.using("snippets", function () {
    StackExchange.snippets.init();
    });
    });
    }, "code-snippets");

    StackExchange.ready(function() {
    var channelOptions = {
    tags: "".split(" "),
    id: "1"
    };
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function() {
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled) {
    StackExchange.using("snippets", function() {
    createEditor();
    });
    }
    else {
    createEditor();
    }
    });

    function createEditor() {
    StackExchange.prepareEditor({
    heartbeatType: 'answer',
    autoActivateHeartbeat: false,
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader: {
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    },
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    });


    }
    });














    draft saved

    draft discarded


















    StackExchange.ready(
    function () {
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53403047%2fkafka-streams-to-topic%23new-answer', 'question_page');
    }
    );

    Post as a guest















    Required, but never shown

























    1 Answer
    1






    active

    oldest

    votes








    1 Answer
    1






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes









    3














    You have to define the Serdes with .to() method to override the default serde type.



    average.to("average",Produced.with(Serdes.String(),Serdes.Double());



    Please refer more details here :



    https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#writing-streams-back-to-kafka






    share|improve this answer


























      3














      You have to define the Serdes with .to() method to override the default serde type.



      average.to("average",Produced.with(Serdes.String(),Serdes.Double());



      Please refer more details here :



      https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#writing-streams-back-to-kafka






      share|improve this answer
























        3












        3








        3






        You have to define the Serdes with .to() method to override the default serde type.



        average.to("average",Produced.with(Serdes.String(),Serdes.Double());



        Please refer more details here :



        https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#writing-streams-back-to-kafka






        share|improve this answer












        You have to define the Serdes with .to() method to override the default serde type.



        average.to("average",Produced.with(Serdes.String(),Serdes.Double());



        Please refer more details here :



        https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#writing-streams-back-to-kafka







        share|improve this answer












        share|improve this answer



        share|improve this answer










        answered Nov 20 at 23:28









        Nishu Tayal

        11.5k73481




        11.5k73481






























            draft saved

            draft discarded




















































            Thanks for contributing an answer to Stack Overflow!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid



            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.


            To learn more, see our tips on writing great answers.





            Some of your past answers have not been well-received, and you're in danger of being blocked from answering.


            Please pay close attention to the following guidance:


            • Please be sure to answer the question. Provide details and share your research!

            But avoid



            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.


            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53403047%2fkafka-streams-to-topic%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            Tonle Sap (See)

            I get strange results when I access the Sqlitedatabase with Unity C# via XAMPP

            Guatemaltekische Davis-Cup-Mannschaft