Spring Cloud Stream Binder












0















Any ideas would be appreciated, I am trying write test as;



@ExtendWith(SpringExtension.class)
@EmbeddedKafka(count = 1, controlledShutdown = true, topics = { "input", "output" }, brokerProperties = { "broker.id=2",
"listeners=PLAINTEXT://127.0.0.1:9092" })
class BindingTest {

@Autowired
private ApplicationContext applicationContext;

@Autowired
private EmbeddedKafkaBroker embeddedKafka;

@Autowired
private CustomBindings cBindings;

/**
* @throws java.lang.Exception
*/
@BeforeEach
void setUp() throws Exception {
}

/**
* @throws java.lang.Exception
*/
@AfterEach
void tearDown() throws Exception {
embeddedKafka.getKafkaServers().forEach(b -> b.shutdown());
}

@Test
void test0() {
String KEY = "KEY";
String testMessage = "TESTMESSAGE";
Message<String> message = MessageBuilder.withPayload(testMessage)
.setHeader(KafkaHeaders.MESSAGE_KEY, KEY).build();
cBindings.output().send(message);

}

@SpringBootApplication
@EnableBinding(CustomBindings.class)
public static class BindingApplication {

}


}



and



spring.cloud.stream.bindings.output.destination=output
spring.cloud.stream.bindings.output.contentType=application/json



spring.cloud.stream.bindings.output.producer.header-mode=raw
spring.cloud.stream.bindings.output.producer.use-native-encoding=true



spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde==org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde==org.apache.kafka.common.serialization.StringSerializer



Still gets



error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@71f0806b]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer, failedMessage=GenericMessage [payload=byte[8], headers={id=c91897bc-2e4e-0a74-bc05-17fb31b690f6, kafka_messageKey=KEY, contentType=application/json, timestamp=1542295539746}]



Which doesnt make sense to me










share|improve this question


















  • 1





    I believe you need to set useNativeDecoding to true since you're choosing to use native Kafka serde. See more here

    – Oleg Zhurakousky
    Nov 15 '18 at 15:38
















0















Any ideas would be appreciated, I am trying write test as;



@ExtendWith(SpringExtension.class)
@EmbeddedKafka(count = 1, controlledShutdown = true, topics = { "input", "output" }, brokerProperties = { "broker.id=2",
"listeners=PLAINTEXT://127.0.0.1:9092" })
class BindingTest {

@Autowired
private ApplicationContext applicationContext;

@Autowired
private EmbeddedKafkaBroker embeddedKafka;

@Autowired
private CustomBindings cBindings;

/**
* @throws java.lang.Exception
*/
@BeforeEach
void setUp() throws Exception {
}

/**
* @throws java.lang.Exception
*/
@AfterEach
void tearDown() throws Exception {
embeddedKafka.getKafkaServers().forEach(b -> b.shutdown());
}

@Test
void test0() {
String KEY = "KEY";
String testMessage = "TESTMESSAGE";
Message<String> message = MessageBuilder.withPayload(testMessage)
.setHeader(KafkaHeaders.MESSAGE_KEY, KEY).build();
cBindings.output().send(message);

}

@SpringBootApplication
@EnableBinding(CustomBindings.class)
public static class BindingApplication {

}


}



and



spring.cloud.stream.bindings.output.destination=output
spring.cloud.stream.bindings.output.contentType=application/json



spring.cloud.stream.bindings.output.producer.header-mode=raw
spring.cloud.stream.bindings.output.producer.use-native-encoding=true



spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde==org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde==org.apache.kafka.common.serialization.StringSerializer



Still gets



error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@71f0806b]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer, failedMessage=GenericMessage [payload=byte[8], headers={id=c91897bc-2e4e-0a74-bc05-17fb31b690f6, kafka_messageKey=KEY, contentType=application/json, timestamp=1542295539746}]



Which doesnt make sense to me










share|improve this question


















  • 1





    I believe you need to set useNativeDecoding to true since you're choosing to use native Kafka serde. See more here

    – Oleg Zhurakousky
    Nov 15 '18 at 15:38














0












0








0








Any ideas would be appreciated, I am trying write test as;



@ExtendWith(SpringExtension.class)
@EmbeddedKafka(count = 1, controlledShutdown = true, topics = { "input", "output" }, brokerProperties = { "broker.id=2",
"listeners=PLAINTEXT://127.0.0.1:9092" })
class BindingTest {

@Autowired
private ApplicationContext applicationContext;

@Autowired
private EmbeddedKafkaBroker embeddedKafka;

@Autowired
private CustomBindings cBindings;

/**
* @throws java.lang.Exception
*/
@BeforeEach
void setUp() throws Exception {
}

/**
* @throws java.lang.Exception
*/
@AfterEach
void tearDown() throws Exception {
embeddedKafka.getKafkaServers().forEach(b -> b.shutdown());
}

@Test
void test0() {
String KEY = "KEY";
String testMessage = "TESTMESSAGE";
Message<String> message = MessageBuilder.withPayload(testMessage)
.setHeader(KafkaHeaders.MESSAGE_KEY, KEY).build();
cBindings.output().send(message);

}

@SpringBootApplication
@EnableBinding(CustomBindings.class)
public static class BindingApplication {

}


}



and



spring.cloud.stream.bindings.output.destination=output
spring.cloud.stream.bindings.output.contentType=application/json



spring.cloud.stream.bindings.output.producer.header-mode=raw
spring.cloud.stream.bindings.output.producer.use-native-encoding=true



spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde==org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde==org.apache.kafka.common.serialization.StringSerializer



Still gets



error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@71f0806b]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer, failedMessage=GenericMessage [payload=byte[8], headers={id=c91897bc-2e4e-0a74-bc05-17fb31b690f6, kafka_messageKey=KEY, contentType=application/json, timestamp=1542295539746}]



Which doesnt make sense to me










share|improve this question














Any ideas would be appreciated, I am trying write test as;



@ExtendWith(SpringExtension.class)
@EmbeddedKafka(count = 1, controlledShutdown = true, topics = { "input", "output" }, brokerProperties = { "broker.id=2",
"listeners=PLAINTEXT://127.0.0.1:9092" })
class BindingTest {

@Autowired
private ApplicationContext applicationContext;

@Autowired
private EmbeddedKafkaBroker embeddedKafka;

@Autowired
private CustomBindings cBindings;

/**
* @throws java.lang.Exception
*/
@BeforeEach
void setUp() throws Exception {
}

/**
* @throws java.lang.Exception
*/
@AfterEach
void tearDown() throws Exception {
embeddedKafka.getKafkaServers().forEach(b -> b.shutdown());
}

@Test
void test0() {
String KEY = "KEY";
String testMessage = "TESTMESSAGE";
Message<String> message = MessageBuilder.withPayload(testMessage)
.setHeader(KafkaHeaders.MESSAGE_KEY, KEY).build();
cBindings.output().send(message);

}

@SpringBootApplication
@EnableBinding(CustomBindings.class)
public static class BindingApplication {

}


}



and



spring.cloud.stream.bindings.output.destination=output
spring.cloud.stream.bindings.output.contentType=application/json



spring.cloud.stream.bindings.output.producer.header-mode=raw
spring.cloud.stream.bindings.output.producer.use-native-encoding=true



spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde==org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde==org.apache.kafka.common.serialization.StringSerializer



Still gets



error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@71f0806b]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer, failedMessage=GenericMessage [payload=byte[8], headers={id=c91897bc-2e4e-0a74-bc05-17fb31b690f6, kafka_messageKey=KEY, contentType=application/json, timestamp=1542295539746}]



Which doesnt make sense to me







spring-cloud-stream spring-kafka






share|improve this question













share|improve this question











share|improve this question




share|improve this question










asked Nov 15 '18 at 15:28









Zafar AliZafar Ali

1216




1216








  • 1





    I believe you need to set useNativeDecoding to true since you're choosing to use native Kafka serde. See more here

    – Oleg Zhurakousky
    Nov 15 '18 at 15:38














  • 1





    I believe you need to set useNativeDecoding to true since you're choosing to use native Kafka serde. See more here

    – Oleg Zhurakousky
    Nov 15 '18 at 15:38








1




1





I believe you need to set useNativeDecoding to true since you're choosing to use native Kafka serde. See more here

– Oleg Zhurakousky
Nov 15 '18 at 15:38





I believe you need to set useNativeDecoding to true since you're choosing to use native Kafka serde. See more here

– Oleg Zhurakousky
Nov 15 '18 at 15:38












1 Answer
1






active

oldest

votes


















1














From the looks of it, this is not a Kafka Streams application, but a regular Spring Cloud Stream application with the Kafka binder. Therefore, you don't need these two properties.

spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde==org.apache.kafka.common.serialization.StringSerializer spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde==org.apache.kafka.common.serialization.StringSerializer



In addition, in order to fix your error, you need to remove this line from your config: spring.cloud.stream.bindings.output.producer.use-native-encoding=true.



By setting native encoding to true, you are asking Kafka to do the serialization which is going to rely on the default ByteArraySerializer. If you really intended native serialization, you need to set the appropriate value serializer (StringSerializer). But since this is a test, I suggest you remove this property and see if your test passes.






share|improve this answer























    Your Answer






    StackExchange.ifUsing("editor", function () {
    StackExchange.using("externalEditor", function () {
    StackExchange.using("snippets", function () {
    StackExchange.snippets.init();
    });
    });
    }, "code-snippets");

    StackExchange.ready(function() {
    var channelOptions = {
    tags: "".split(" "),
    id: "1"
    };
    initTagRenderer("".split(" "), "".split(" "), channelOptions);

    StackExchange.using("externalEditor", function() {
    // Have to fire editor after snippets, if snippets enabled
    if (StackExchange.settings.snippets.snippetsEnabled) {
    StackExchange.using("snippets", function() {
    createEditor();
    });
    }
    else {
    createEditor();
    }
    });

    function createEditor() {
    StackExchange.prepareEditor({
    heartbeatType: 'answer',
    autoActivateHeartbeat: false,
    convertImagesToLinks: true,
    noModals: true,
    showLowRepImageUploadWarning: true,
    reputationToPostImages: 10,
    bindNavPrevention: true,
    postfix: "",
    imageUploader: {
    brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
    contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
    allowUrls: true
    },
    onDemand: true,
    discardSelector: ".discard-answer"
    ,immediatelyShowMarkdownHelp:true
    });


    }
    });














    draft saved

    draft discarded


















    StackExchange.ready(
    function () {
    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53322709%2fspring-cloud-stream-binder%23new-answer', 'question_page');
    }
    );

    Post as a guest















    Required, but never shown

























    1 Answer
    1






    active

    oldest

    votes








    1 Answer
    1






    active

    oldest

    votes









    active

    oldest

    votes






    active

    oldest

    votes









    1














    From the looks of it, this is not a Kafka Streams application, but a regular Spring Cloud Stream application with the Kafka binder. Therefore, you don't need these two properties.

    spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde==org.apache.kafka.common.serialization.StringSerializer spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde==org.apache.kafka.common.serialization.StringSerializer



    In addition, in order to fix your error, you need to remove this line from your config: spring.cloud.stream.bindings.output.producer.use-native-encoding=true.



    By setting native encoding to true, you are asking Kafka to do the serialization which is going to rely on the default ByteArraySerializer. If you really intended native serialization, you need to set the appropriate value serializer (StringSerializer). But since this is a test, I suggest you remove this property and see if your test passes.






    share|improve this answer




























      1














      From the looks of it, this is not a Kafka Streams application, but a regular Spring Cloud Stream application with the Kafka binder. Therefore, you don't need these two properties.

      spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde==org.apache.kafka.common.serialization.StringSerializer spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde==org.apache.kafka.common.serialization.StringSerializer



      In addition, in order to fix your error, you need to remove this line from your config: spring.cloud.stream.bindings.output.producer.use-native-encoding=true.



      By setting native encoding to true, you are asking Kafka to do the serialization which is going to rely on the default ByteArraySerializer. If you really intended native serialization, you need to set the appropriate value serializer (StringSerializer). But since this is a test, I suggest you remove this property and see if your test passes.






      share|improve this answer


























        1












        1








        1







        From the looks of it, this is not a Kafka Streams application, but a regular Spring Cloud Stream application with the Kafka binder. Therefore, you don't need these two properties.

        spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde==org.apache.kafka.common.serialization.StringSerializer spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde==org.apache.kafka.common.serialization.StringSerializer



        In addition, in order to fix your error, you need to remove this line from your config: spring.cloud.stream.bindings.output.producer.use-native-encoding=true.



        By setting native encoding to true, you are asking Kafka to do the serialization which is going to rely on the default ByteArraySerializer. If you really intended native serialization, you need to set the appropriate value serializer (StringSerializer). But since this is a test, I suggest you remove this property and see if your test passes.






        share|improve this answer













        From the looks of it, this is not a Kafka Streams application, but a regular Spring Cloud Stream application with the Kafka binder. Therefore, you don't need these two properties.

        spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde==org.apache.kafka.common.serialization.StringSerializer spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde==org.apache.kafka.common.serialization.StringSerializer



        In addition, in order to fix your error, you need to remove this line from your config: spring.cloud.stream.bindings.output.producer.use-native-encoding=true.



        By setting native encoding to true, you are asking Kafka to do the serialization which is going to rely on the default ByteArraySerializer. If you really intended native serialization, you need to set the appropriate value serializer (StringSerializer). But since this is a test, I suggest you remove this property and see if your test passes.







        share|improve this answer












        share|improve this answer



        share|improve this answer










        answered Nov 15 '18 at 16:18









        sobychackosobychacko

        885512




        885512
































            draft saved

            draft discarded




















































            Thanks for contributing an answer to Stack Overflow!


            • Please be sure to answer the question. Provide details and share your research!

            But avoid



            • Asking for help, clarification, or responding to other answers.

            • Making statements based on opinion; back them up with references or personal experience.


            To learn more, see our tips on writing great answers.




            draft saved


            draft discarded














            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53322709%2fspring-cloud-stream-binder%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown





















































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown

































            Required, but never shown














            Required, but never shown












            Required, but never shown







            Required, but never shown







            Popular posts from this blog

            Florida Star v. B. J. F.

            Error while running script in elastic search , gateway timeout

            Adding quotations to stringified JSON object values