Does Buyavette Service Dept Handle Dim Hvac Readout Repairs
Post-obit How to Work with Apache Kafka in Your Spring Boot Awarding, which shows how to become started with Spring Boot and Apache Kafka ®, we'll dig a piffling deeper into some of the additional features that the Spring for Apache Kafka project provides.
Spring for Apache Kafka brings the familiar Spring programming model to Kafka. Information technology provides the KafkaTemplate for publishing records and a listener container for asynchronous execution of POJO listeners. Bound Boot motorcar-configuration wires up much of the infrastructure so that you lot tin can concentrate on your business logic.
Error Recovery
Consider this unproblematic POJO listener method:
@KafkaListener(id = "fooGroup", topics = "topic1") public void listen(String in) { logger.info("Received: " + in); if (in.startsWith("foo")) { throw new RuntimeException("failed"); } } Past default, records that fail are only logged, and we move on to the side by side i. We can, however, configure an error handler in the listener container to perform some other action. To practice so, we override Spring Boot'due south auto-configured container manufactory with our own:
@Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory) { ConcurrentKafkaListenerContainerFactory<Object, Object> manufacturing plant = new ConcurrentKafkaListenerContainerFactory<>(); configurer.configure(factory, kafkaConsumerFactory); manufactory.setErrorHandler(new SeekToCurrentErrorHandler()); // <<<<<< return factory; } Note that we tin can all the same leverage much of the auto-configuration too.
The SeekToCurrentErrorHandler discards remaining records from the poll() and performs seek operations on the consumer to reset the offsets and then that the discarded records are fetched once again on the next poll. By default, the fault handler tracks the failed record, gives up after 10 delivery attempts, and logs the failed tape. However, nosotros can likewise send the failed message to another topic. We phone call this a dead letter topic.
The post-obit example puts it all together:
@Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory, KafkaTemplate<Object, Object> template) { ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); configurer.configure(factory, kafkaConsumerFactory); factory.setErrorHandler(new SeekToCurrentErrorHandler( new DeadLetterPublishingRecoverer(template), 3)); return factory; } @KafkaListener(id = "fooGroup", topics = "topic1") public void listen(Cord in) { logger.info("Received: " + in); if (in.startsWith("foo")) { throw new RuntimeException("failed"); } } @KafkaListener(id = "dltGroup", topics = "topic1.DLT") public void dltListen(String in) { logger.info("Received from DLT: " + in); } Deserialization Errors
But what about deserialization exceptions, which occur before Leap gets the record? Enter the ErrorHandlingDeserializer. This deserializer wraps a delegate deserializer and catches whatever exceptions. These are then forwarded to the listener container, which sends them directly to the error handler. The exception contains the source data so you tin can diagnose the problem.
Domain Objects and Inferring the Blazon
Consider the post-obit example:
@Bean public RecordMessageConverter converter() { return new StringJsonMessageConverter(); } @KafkaListener(id = "fooGroup", topics = "topic1") public void listen(Foo2 foo) { logger.info("Received: " + foo); if (foo.getFoo().startsWith("fail")) { throw new RuntimeException("failed"); } } @KafkaListener(id = "dltGroup", topics = "topic1.DLT") public void dltListen(Foo2 in) { logger.info("Received from DLT: " + in); } Observe we are at present consuming objects of type Foo2. The message converter bean infers the type to convert to the parameter blazon in the method signature.
The converter automatically "trusts" the type. Spring Boot machine-configures the converter into the listener container.
On the producer side, the sent object tin can exist a unlike class (equally long equally it is type compatible):
@RestController public class Controller { @Autowired private KafkaTemplate<Object, Object> template; @PostMapping(path = "/send/foo/{what}") public void sendFoo(@PathVariable String what) { this.template.send("topic1", new Foo1(what)); } } And:
spring: kafka: producer: value-serializer: org.springframework.kafka.back up.serializer.JsonSerializer $ curl -X POST http://localhost:8080/send/foo/neglect Here, we utilize a StringDeserializer and the "smart" message converter on the consumer side.
Multi-Method Listeners
We can besides utilise a unmarried listener container and route to specific methods based on the type. Nosotros tin't infer the type this time since the type is used to select the method to telephone call.
Instead, we rely on type information passed in the record headers to map from the source type to the target type. Also, since we do not infer the type, we demand to configure the message converter to "trust" the package for the mapped type.
In this case, we'll use a bulletin converter on both sides (together with a StringSerializer and a StringDeserializer). The post-obit example of the consumer-side converter puts information technology all together:
@Bean public RecordMessageConverter converter() { StringJsonMessageConverter converter = new StringJsonMessageConverter(); DefaultJackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper(); typeMapper.setTypePrecedence(TypePrecedence.TYPE_ID); typeMapper.addTrustedPackages("com.common"); Map<String, Class<?>> mappings = new HashMap<>(); mappings.put("foo", Foo2.form); mappings.put("bar", Bar2.form); typeMapper.setIdClassMapping(mappings); converter.setTypeMapper(typeMapper); return converter; } Here, we map from "foo" to class Foo2 and "bar" to grade Bar2. Notice that we have to tell information technology to apply the TYPE_ID header to decide the type for the conversion. Over again, Leap Boot auto-configures the message converter into the container. Below is the producer-side blazon mapping in a snippet of the application.yml file; the format is a comma-delimited list of token:FQCN:
spring: kafka: producer: value-serializer: org.springframework.kafka.support.serializer.JsonSerializer backdrop: spring.json.type.mapping: foo:com.common.Foo1,bar:com.common.Bar1 This configuration maps class Foo1 to "foo" and class Bar1 to "bar."
Listener:
@Component @KafkaListener(id = "multiGroup", topics = { "foos", "bars" }) public course MultiMethods { @KafkaHandler public void foo(Foo1 foo) { Organization.out.println("Received: " + foo); } @KafkaHandler public void bar(Bar bar) { System.out.println("Received: " + bar); } @KafkaHandler(isDefault = true) public void unknown(Object object) { System.out.println("Received unknown: " + object); } } Producer:
@RestController public class Controller { @Autowired private KafkaTemplate<Object, Object> template; @PostMapping(path = "/send/foo/{what}") public void sendFoo(@PathVariable String what) { this.template.transport(new GenericMessage<>(new Foo1(what), Collections.singletonMap(KafkaHeaders.TOPIC, "foos"))); } @PostMapping(path = "/send/bar/{what}") public void sendBar(@PathVariable Cord what) { this.template.send(new GenericMessage<>(new Bar(what), Collections.singletonMap(KafkaHeaders.TOPIC, "bars"))); } @PostMapping(path = "/ship/unknown/{what}") public void sendUnknown(@PathVariable Cord what) { this.template.send(new GenericMessage<>(what, Collections.singletonMap(KafkaHeaders.TOPIC, "bars"))); } } Transactions
Transactions are enabled by setting the transactional-id-prefix in the application.yml file:
spring: kafka: producer: value-serializer: org.springframework.kafka.support.serializer.JsonSerializer transaction-id-prefix: tx. consumer: properties: isolation.level: read_committed When using spring-kafka 1.iii.10 or later and a kafka-clients version that supports transactions (0.11 or later), any KafkaTemplate operations performed in a @KafkaListener method will participate in the transaction, and the listener container will send the offsets to the transaction before committing it. Recognize that we also fix the isolation level for the consumers to non accept visibility into uncommitted records. The following example pauses the listener so that we can see the effect of this:
@KafkaListener(id = "fooGroup2", topics = "topic2") public void listen(List foos) throws IOException { logger.info("Received: " + foos); foos.forEach(f -> kafkaTemplate.transport("topic3", f.getFoo().toUpperCase())); logger.info("Messages sent, hitting enter to commit tx"); System.in.read(); } @KafkaListener(id = "fooGroup3", topics = "topic3") public void listen(String in) { logger.info("Received: " + in); } The producer for this instance sends multiple records in a single transaction:
@PostMapping(path = "/transport/foos/{what}") public void sendFoo(@PathVariable Cord what) { this.template.executeInTransaction(kafkaTemplate -> { StringUtils.commaDelimitedListToSet(what).stream() .map(s -> new Foo1(s)) .forEach(foo -> kafkaTemplate.send("topic2", foo)); return null; }); } curl -10 POST http://localhost:8080/transport/foos/a,b,c,d,e Received: [Foo2 [foo=a], Foo2 [foo=b], Foo2 [foo=c], Foo2 [foo=d], Foo2 [foo=e]] Messages sent, hitting Enter to commit tx Received: [A, B, C, D, Eastward] Conclusion
Using Spring with Apache Kafka can eliminate much of the average code that you otherwise need. It also adds features such as error treatment, retrying, and record filtering — and nosotros've only only touched the surface.
Topics:
integraton, apache kafka, error treatment, message conversion, transaction support, jump boot
Source: https://dzone.com/articles/spring-for-apache-kafka-deep-dive-part-1-error-han
Posted by: zemanhicat1936.blogspot.com

0 Response to "Does Buyavette Service Dept Handle Dim Hvac Readout Repairs"
Post a Comment