探索如何使用 ChatGPT 创建物联网 Kafka 事件消费者,并使用 API 逻辑服务器来生成定义范围之外的温度读取事件。
Apache Kafka 已成为从静态数据(数据库事务)迁移到事件流的企业架构的明显领导者。有许多演示文稿解释了 Kafka 的工作原理以及如何扩展此技术堆栈(本地或云)。使用 ChatGPT 构建一个微服务来消费消息并丰富、转换和持久化是该项目的下一阶段。在此示例中,我们将使用来自 IoT 设备 (RaspberryPi) 的输入,该设备每隔几秒钟发送一次 JSON 温度读数。
使用消息
生成(并记录)每条 Kafka 事件消息时,Kafka 微服务使用者已准备好处理每条消息。我让 ChatGPT 生成一些 Python 代码,它为我提供了从命名的“主题”中轮询和读取的基础知识。我得到的是一个非常好的开始,可以消耗主题、键和 JSON 有效负载。ChatGPT 创建了代码,使用 SQLAlchemy 将其持久化到数据库中。然后,我想转换 JSON 有效负载,并使用 API Logic Server(ALS - GitHub 上的一个开源项目)规则来取消 JSON,验证、计算并根据给定范围之外的源温度生成一组新的消息有效负载。
ChatGPT: “design a Python Event Streaming Kafka Consumer interface”
注意:ChatGPT 选择了 Confluent Kafka 库(并使用其 Docker Kafka 容器)- 您可以修改代码以使用其他 Python Kafka 库。
SQLAlchemy模型
使用 API Logic Server(ALS:Python 开源平台),我们连接到 MySQL 数据库。ALS 将读取这些表,并为每个 ORM 端点创建一个 SQLAlchemy ORM 模型、一个 react-admin 用户界面、safrs-JSON Open API (Swagger) 和一个正在运行的 REST Web 服务。新的温度表将包含时间戳、IoT 设备 ID 和温度读数。在这里,我们使用 ALS 命令行实用程序来创建 ORM 模型:
ApiLogicServer create --project_name=iot --db_url=mysql+pymysql://root:password@127.0.0.1:3308/iot
API Logic Server 生成的类用于保存我们的值。Temperature
class Temperature(SAFRSBase, Base):br
__tablename__ = 'Temperature'br
_s_collection_name = 'Temperature' # type: ignorebr
__bind_key__ = 'None'br
br
Id = Column(Integer, primary_key=True)br
DeviceId = Column(Integer, nullable=False)br
TempReading = Column(Integer, nullable=False)br
CreateDT = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP"), nullable=False)br
KafkaMessageSent = Column(Booelan, default=text("False"))
变化
因此,我们不是将 Kafka JSON 使用者消息再次保存在 SQL 数据库中(并触发规则来执行工作),而是解包 JSON 有效负载 () 并将其插入 Temperature 表,而不是保存 JSON 有效负载。我们让声明性规则处理每个温度读数。util.row_to_entity
蟒
entity = models.Temperature()br
util.row_to_entity(message_data, entity) br
session.add(entity)
当消费者收到消息时,它会将其添加到会话中,从而触发规则(如下)。commit_event
声明性逻辑:生成消息
使用 API Logic Server(使用 SQLAlchemy、Flask 和类似 LogicBank 电子表格的规则引擎构建的自动化框架:公式、总和、计数、复制、约束、事件等),我们在 ORM 实体上添加一个声明性规则。当每条消息都保存到 Temperature 表中时,将调用该规则。如果温度读数超过或小于 ,我们将发送有关该主题的 Kafka 消息。我们还添加了一个约束,以确保我们在正常范围 (-) 内接收数据。我们将让另一个事件使用者处理警报消息。
commit_eventTemperaturecommit_eventMAX_TEMPMIN_TEMP“TempRangeAlert”32132TDD 行为测试
使用 TDD(Test Driven Development),我们可以编写一个 Behave 测试,将记录直接插入到 Temperature 表中,然后检查返回值。行为以 /(.feature 文件)开头。对于每个场景,我们使用装饰器编写相应的 Python 类。
KafkaMessageSentFeatureScenarioBehave
功能定义
Feature: TDD Temperature Examplebr
br
Scenario: Temperature Processingbr
Given A Kafka Message Normal (Temperature)br
When Transactions normal temperature is submittedbr
Then Check KafkaMessageSent Flag is Falsebr
br
Scenario: Temperature Processingbr
Given A Kafka Message Abnormal (Temperature)br
When Transactions abnormal temperature is submittedbr
Then Check KafkaMessageSent Flag is True
TDD Python 类
from behave import *br
import safrsbr
br
db = safrs.DB br
session = db.sessionbr
br
def insertTemperature(temp:int) -> bool:br
entity = model.Temperature()br
entity.TempReading = tempbr
entity.DeviceId = 'local_behave_test'br
session.add(entity) br
return entity.KafkaMessageSent br
br
@given('A Kafka Message Normal (Temperature)')br
def step_impl(context):br
context.temp = 76br
assert Truebr
br
@when('Transactions normal temperature is submitted')br
def step_impl(context):br
context.response_text = insertTemperature(context.temp)br
br
@then('Check KafkaMessageSent Flag is False')br
def step_impl(context):br
assert context.response_text == False
总结
使用 ChatGPT 为 Consumer 和 Producer 生成 Kafka 消息代码似乎是一个很好的起点。安装 Confluent Docker for Kafka。将 API Logic Server 用于声明性逻辑规则,使我们能够将公式、约束和事件添加到正常的事务流中,并将其添加到我们的 SQL 数据库中,并生成(和转换)新的 Kafka 消息,这是一个很好的组合。ChatGPT 和声明式逻辑是“配对编程”的下一个层次。
from confluent_kafka import Producerbr
conf = {'bootstrap.servers': 'localhostd:9092'}br
producer = Producer(conf)br
MAX_TEMP = arg.MAX_TEMP or 102br
MIN_TEMP = arg.MIN_TTEMP or 78br
br
def produce_message(br
row: models.KafkaMessage, br
old_row: models.KafkaMessage, br
logic_row: LogicRow):br
br
if logic_row.isInserted() and row.TempReading > MAX_TEMP:br
produce(topic="TempRangeAlert", br
key=row.Id,br
value=f"The temperature {row.TempReading}F exceeds {MAX_TEMP}F on Device {row.DeviceId}")br
row.KafkaMessageSent = Truebr
br
if logic_row.isInserted() and row.TempReading < MIN_TEMP:br
produce(topic="TempRangeAlert", br
key=row.Id,br
value=f"The temperature {row.TempReading}F less than {MIN_TEMP}F on Device {row.DeviceId}")br
br
row.KafkaMessageSent = Truebr
br
Rules.constraint(models.Temperature, br
as_expression= lambda row: row.TempReading < 32 or row.TempReading > 132, br
error_message= "Temperature {row.TempReading} is out of range"br
Rules.commit_event(models.Temperature, calling=produce_message)
仅当温度读数大于或小于时才会生成警报消息。Constraint 将在调用 commit 事件之前检查温度范围(请注意,规则始终是无序的,可以随着规范的变化而引入)。MAX_TEMPMIN_TEMP
原文标题:Kafka Event Streaming AI and Automation
原文链接:
https://dzone.com/articles/event-streaming-ai-amp-automation
作者:Tyler Band
编译:LCR