Spring Cloud Stream Binder
Any ideas would be appreciated, I am trying write test as;
@ExtendWith(SpringExtension.class)
@EmbeddedKafka(count = 1, controlledShutdown = true, topics = { "input", "output" }, brokerProperties = { "broker.id=2",
"listeners=PLAINTEXT://127.0.0.1:9092" })
class BindingTest {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Autowired
private CustomBindings cBindings;
/**
* @throws java.lang.Exception
*/
@BeforeEach
void setUp() throws Exception {
}
/**
* @throws java.lang.Exception
*/
@AfterEach
void tearDown() throws Exception {
embeddedKafka.getKafkaServers().forEach(b -> b.shutdown());
}
@Test
void test0() {
String KEY = "KEY";
String testMessage = "TESTMESSAGE";
Message<String> message = MessageBuilder.withPayload(testMessage)
.setHeader(KafkaHeaders.MESSAGE_KEY, KEY).build();
cBindings.output().send(message);
}
@SpringBootApplication
@EnableBinding(CustomBindings.class)
public static class BindingApplication {
}
}
and
spring.cloud.stream.bindings.output.destination=output
spring.cloud.stream.bindings.output.contentType=application/json
spring.cloud.stream.bindings.output.producer.header-mode=raw
spring.cloud.stream.bindings.output.producer.use-native-encoding=true
spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde==org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde==org.apache.kafka.common.serialization.StringSerializer
Still gets
error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@71f0806b]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer, failedMessage=GenericMessage [payload=byte[8], headers={id=c91897bc-2e4e-0a74-bc05-17fb31b690f6, kafka_messageKey=KEY, contentType=application/json, timestamp=1542295539746}]
Which doesnt make sense to me
spring-cloud-stream spring-kafka
add a comment |
Any ideas would be appreciated, I am trying write test as;
@ExtendWith(SpringExtension.class)
@EmbeddedKafka(count = 1, controlledShutdown = true, topics = { "input", "output" }, brokerProperties = { "broker.id=2",
"listeners=PLAINTEXT://127.0.0.1:9092" })
class BindingTest {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Autowired
private CustomBindings cBindings;
/**
* @throws java.lang.Exception
*/
@BeforeEach
void setUp() throws Exception {
}
/**
* @throws java.lang.Exception
*/
@AfterEach
void tearDown() throws Exception {
embeddedKafka.getKafkaServers().forEach(b -> b.shutdown());
}
@Test
void test0() {
String KEY = "KEY";
String testMessage = "TESTMESSAGE";
Message<String> message = MessageBuilder.withPayload(testMessage)
.setHeader(KafkaHeaders.MESSAGE_KEY, KEY).build();
cBindings.output().send(message);
}
@SpringBootApplication
@EnableBinding(CustomBindings.class)
public static class BindingApplication {
}
}
and
spring.cloud.stream.bindings.output.destination=output
spring.cloud.stream.bindings.output.contentType=application/json
spring.cloud.stream.bindings.output.producer.header-mode=raw
spring.cloud.stream.bindings.output.producer.use-native-encoding=true
spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde==org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde==org.apache.kafka.common.serialization.StringSerializer
Still gets
error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@71f0806b]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer, failedMessage=GenericMessage [payload=byte[8], headers={id=c91897bc-2e4e-0a74-bc05-17fb31b690f6, kafka_messageKey=KEY, contentType=application/json, timestamp=1542295539746}]
Which doesnt make sense to me
spring-cloud-stream spring-kafka
1
I believe you need to setuseNativeDecoding
to true since you're choosing to use native Kafka serde. See more here
– Oleg Zhurakousky
Nov 15 '18 at 15:38
add a comment |
Any ideas would be appreciated, I am trying write test as;
@ExtendWith(SpringExtension.class)
@EmbeddedKafka(count = 1, controlledShutdown = true, topics = { "input", "output" }, brokerProperties = { "broker.id=2",
"listeners=PLAINTEXT://127.0.0.1:9092" })
class BindingTest {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Autowired
private CustomBindings cBindings;
/**
* @throws java.lang.Exception
*/
@BeforeEach
void setUp() throws Exception {
}
/**
* @throws java.lang.Exception
*/
@AfterEach
void tearDown() throws Exception {
embeddedKafka.getKafkaServers().forEach(b -> b.shutdown());
}
@Test
void test0() {
String KEY = "KEY";
String testMessage = "TESTMESSAGE";
Message<String> message = MessageBuilder.withPayload(testMessage)
.setHeader(KafkaHeaders.MESSAGE_KEY, KEY).build();
cBindings.output().send(message);
}
@SpringBootApplication
@EnableBinding(CustomBindings.class)
public static class BindingApplication {
}
}
and
spring.cloud.stream.bindings.output.destination=output
spring.cloud.stream.bindings.output.contentType=application/json
spring.cloud.stream.bindings.output.producer.header-mode=raw
spring.cloud.stream.bindings.output.producer.use-native-encoding=true
spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde==org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde==org.apache.kafka.common.serialization.StringSerializer
Still gets
error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@71f0806b]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer, failedMessage=GenericMessage [payload=byte[8], headers={id=c91897bc-2e4e-0a74-bc05-17fb31b690f6, kafka_messageKey=KEY, contentType=application/json, timestamp=1542295539746}]
Which doesnt make sense to me
spring-cloud-stream spring-kafka
Any ideas would be appreciated, I am trying write test as;
@ExtendWith(SpringExtension.class)
@EmbeddedKafka(count = 1, controlledShutdown = true, topics = { "input", "output" }, brokerProperties = { "broker.id=2",
"listeners=PLAINTEXT://127.0.0.1:9092" })
class BindingTest {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Autowired
private CustomBindings cBindings;
/**
* @throws java.lang.Exception
*/
@BeforeEach
void setUp() throws Exception {
}
/**
* @throws java.lang.Exception
*/
@AfterEach
void tearDown() throws Exception {
embeddedKafka.getKafkaServers().forEach(b -> b.shutdown());
}
@Test
void test0() {
String KEY = "KEY";
String testMessage = "TESTMESSAGE";
Message<String> message = MessageBuilder.withPayload(testMessage)
.setHeader(KafkaHeaders.MESSAGE_KEY, KEY).build();
cBindings.output().send(message);
}
@SpringBootApplication
@EnableBinding(CustomBindings.class)
public static class BindingApplication {
}
}
and
spring.cloud.stream.bindings.output.destination=output
spring.cloud.stream.bindings.output.contentType=application/json
spring.cloud.stream.bindings.output.producer.header-mode=raw
spring.cloud.stream.bindings.output.producer.use-native-encoding=true
spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde==org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde==org.apache.kafka.common.serialization.StringSerializer
Still gets
error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@71f0806b]; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer, failedMessage=GenericMessage [payload=byte[8], headers={id=c91897bc-2e4e-0a74-bc05-17fb31b690f6, kafka_messageKey=KEY, contentType=application/json, timestamp=1542295539746}]
Which doesnt make sense to me
spring-cloud-stream spring-kafka
spring-cloud-stream spring-kafka
asked Nov 15 '18 at 15:28
Zafar AliZafar Ali
1216
1216
1
I believe you need to setuseNativeDecoding
to true since you're choosing to use native Kafka serde. See more here
– Oleg Zhurakousky
Nov 15 '18 at 15:38
add a comment |
1
I believe you need to setuseNativeDecoding
to true since you're choosing to use native Kafka serde. See more here
– Oleg Zhurakousky
Nov 15 '18 at 15:38
1
1
I believe you need to set
useNativeDecoding
to true since you're choosing to use native Kafka serde. See more here– Oleg Zhurakousky
Nov 15 '18 at 15:38
I believe you need to set
useNativeDecoding
to true since you're choosing to use native Kafka serde. See more here– Oleg Zhurakousky
Nov 15 '18 at 15:38
add a comment |
1 Answer
1
active
oldest
votes
From the looks of it, this is not a Kafka Streams application, but a regular Spring Cloud Stream application with the Kafka binder. Therefore, you don't need these two properties.
spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde==org.apache.kafka.common.serialization.StringSerializer spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde==org.apache.kafka.common.serialization.StringSerializer
In addition, in order to fix your error, you need to remove this line from your config: spring.cloud.stream.bindings.output.producer.use-native-encoding=true
.
By setting native encoding to true, you are asking Kafka to do the serialization which is going to rely on the default ByteArraySerializer
. If you really intended native serialization, you need to set the appropriate value serializer (StringSerializer
). But since this is a test, I suggest you remove this property and see if your test passes.
add a comment |
Your Answer
StackExchange.ifUsing("editor", function () {
StackExchange.using("externalEditor", function () {
StackExchange.using("snippets", function () {
StackExchange.snippets.init();
});
});
}, "code-snippets");
StackExchange.ready(function() {
var channelOptions = {
tags: "".split(" "),
id: "1"
};
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function() {
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled) {
StackExchange.using("snippets", function() {
createEditor();
});
}
else {
createEditor();
}
});
function createEditor() {
StackExchange.prepareEditor({
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader: {
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
},
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
});
}
});
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53322709%2fspring-cloud-stream-binder%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
From the looks of it, this is not a Kafka Streams application, but a regular Spring Cloud Stream application with the Kafka binder. Therefore, you don't need these two properties.
spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde==org.apache.kafka.common.serialization.StringSerializer spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde==org.apache.kafka.common.serialization.StringSerializer
In addition, in order to fix your error, you need to remove this line from your config: spring.cloud.stream.bindings.output.producer.use-native-encoding=true
.
By setting native encoding to true, you are asking Kafka to do the serialization which is going to rely on the default ByteArraySerializer
. If you really intended native serialization, you need to set the appropriate value serializer (StringSerializer
). But since this is a test, I suggest you remove this property and see if your test passes.
add a comment |
From the looks of it, this is not a Kafka Streams application, but a regular Spring Cloud Stream application with the Kafka binder. Therefore, you don't need these two properties.
spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde==org.apache.kafka.common.serialization.StringSerializer spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde==org.apache.kafka.common.serialization.StringSerializer
In addition, in order to fix your error, you need to remove this line from your config: spring.cloud.stream.bindings.output.producer.use-native-encoding=true
.
By setting native encoding to true, you are asking Kafka to do the serialization which is going to rely on the default ByteArraySerializer
. If you really intended native serialization, you need to set the appropriate value serializer (StringSerializer
). But since this is a test, I suggest you remove this property and see if your test passes.
add a comment |
From the looks of it, this is not a Kafka Streams application, but a regular Spring Cloud Stream application with the Kafka binder. Therefore, you don't need these two properties.
spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde==org.apache.kafka.common.serialization.StringSerializer spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde==org.apache.kafka.common.serialization.StringSerializer
In addition, in order to fix your error, you need to remove this line from your config: spring.cloud.stream.bindings.output.producer.use-native-encoding=true
.
By setting native encoding to true, you are asking Kafka to do the serialization which is going to rely on the default ByteArraySerializer
. If you really intended native serialization, you need to set the appropriate value serializer (StringSerializer
). But since this is a test, I suggest you remove this property and see if your test passes.
From the looks of it, this is not a Kafka Streams application, but a regular Spring Cloud Stream application with the Kafka binder. Therefore, you don't need these two properties.
spring.cloud.stream.kafka.streams.bindings.output.producer.keySerde==org.apache.kafka.common.serialization.StringSerializer spring.cloud.stream.kafka.streams.bindings.output.producer.valueSerde==org.apache.kafka.common.serialization.StringSerializer
In addition, in order to fix your error, you need to remove this line from your config: spring.cloud.stream.bindings.output.producer.use-native-encoding=true
.
By setting native encoding to true, you are asking Kafka to do the serialization which is going to rely on the default ByteArraySerializer
. If you really intended native serialization, you need to set the appropriate value serializer (StringSerializer
). But since this is a test, I suggest you remove this property and see if your test passes.
answered Nov 15 '18 at 16:18
sobychackosobychacko
885512
885512
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53322709%2fspring-cloud-stream-binder%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
1
I believe you need to set
useNativeDecoding
to true since you're choosing to use native Kafka serde. See more here– Oleg Zhurakousky
Nov 15 '18 at 15:38