本文通过一个模拟场景介绍Kafka连接器的简单使用。场景为消费者下发订餐信息(消息队列Kafka的Topic发送消息),订餐信息持久化存储到指定数据库。
前提条件
- 在阿里云容器服务Kubernetes版上创建了Kafka服务和Database服务。具体操作,请参见使用镜像快速创建无状态Deployment应用。
- 已经在Kafka服务器创建Topic,用于模拟消费者下发订餐信息。
背景信息
本文介绍的是实现订阅订餐信息,订餐信息持久化处理的配置流程。
Kafka的主题TopicA发送消息模拟消费者下发订餐信息,然后订餐信息存储到数据库的orders表。
订餐信息内容假设为消费者ID(customer_id)、商家ID(restaurant_id)和食品(food)的JSON语句,示例如下:
{
"customer_id": 200,
"restaurant_id": 800,
"food": "apple"
}
orders表中也设置了customer_id、restaurant_id和food字段,以便和订餐信息一一对应。
创建的集成主要完成以下动作:
- 订阅订餐信息(即TopicA发送的消息)。
- 订餐信息持久化存储(即保存消息到数据库表)。
视频教程
创建连接
本示例中会用到Kafka和Database连接,所以需要借助连接器创建对应的连接。
创建Kafka和Database连接。
创建连接的具体操作,请参见创建连接。
创建空白集成
- 登录应用集成控制台。
- 在顶部菜单栏,选择地域。
- 在左侧导航栏,选择 。
- 在集成列表页面,选择目标命名空间,然后单击新建集成。
- 在新建集成面板,选择创建方式为空白流,选择目标环境,输入集成名称,然后单击创建。
- 集成创建后,进入集成设计页面,在页面右上角单击保存。
创建集成流
部署集成
集成创建并保存后,需要对集成进行部署。具体操作,请参见部署集成示例。