我正在使用 mysql 数据库和 kafka 作为消息服务运行 spring boot 应用程序(使用 chainedKafkaTransactionManager 为 kafka 和 mysql 进行事务同步)以进行一些异步操作。
当多条消息到达 kafka 监听器时,旧数据来自数据库而不是以前提交的数据。
我正在使用 crud 存储库,而且这种情况只会同时发生在多条消息上
示例:更新一个具有名称和 ID 的对象 PERSON;
第一条消息将通过 id 获取对象并将名称更新为 SAM。
第二条消息将获取对象并将名称更新为 REGO
第三条消息将获取对象,然后如果我检查它包含 SAM 作为名称的数据,但在数据库中它有 REGO。
我尝试添加隔离属性作为事务中提交的读取但没有运气
// listeners
@Autowired
private PersonRepository personRepository;
@Autowired
private AddressRepository addressRepository;
@KafkaListener(id = "update_name", topics = "update_name")
@Transactional(readOnly = false)
public void updateName(PersonModel personModel) {
Person person = personRepository.findById(personModel.getId());
log.info("before -> name which is in database : " + person.getName());
person.setName(personModel.getName());
person = personRepository.save(person);
log.info("after-> name which is in database : " + person.getName());
}
@KafkaListener(id = "update_name_and_address", topics = "update_name_and_address")
@Transactional(readOnly = false)
public void updateNameAndAddress(PersonModel personModel) {
Address addr= addressRepository.findById(personModel.getAddresId);
addr.setPlace(personModel.getPlace());
addressRepository.save(addr);
updateName(personModel);
}
// repository
public interface PersonRepository extends CrudRepository<Person , Integer> {
}
我需要数据库中的最新数据
Qyouu
相关分类