| import boto3
|
| import mysql.connector
|
| import json
|
|
|
| # 设置 Amazon SQS 客户端
|
| sqs_client = boto3.client('sqs', region_name='your-region')
|
| work_queue_url = 'https://sqs.your-region.amazonaws.com/your-account-id/ai-work-queue'
|
| result_queue_url = 'https://sqs.your-region.amazonaws.com/your-account-id/ai-result-queue'
|
|
|
| # 设置 Amazon Aurora MySQL 数据库连接
|
| db_config = {
|
| 'user': 'your-username',
|
| 'password': 'your-password',
|
| 'host': 'your-aurora-endpoint',
|
| 'database': 'your-database-name'
|
| }
|
|
|
| def receive_work_message():
|
| response = sqs_client.receive_message(
|
| QueueUrl=work_queue_url,
|
| MaxNumberOfMessages=1
|
| )
|
| if 'Messages' in response:
|
| return response['Messages'][0]
|
| return None
|
|
|
| def process_work_message(message_body):
|
| # 在这里实现工作信息的处置惩罚逻辑
|
| # 假设处置惩罚结果为 mykey='key1', myvalue='value1'
|
| return {'mykey': 'key1', 'myvalue': 'value1'}
|
|
|
| def send_result_message(result):
|
| sqs_client.send_message(
|
| QueueUrl=result_queue_url,
|
| MessageBody='Result message',
|
| MessageAttributes={
|
| 'mykey': {'DataType': 'String', 'StringValue': result['mykey']},
|
| 'myvalue': {'DataType': 'String', 'StringValue': result['myvalue']}
|
| }
|
| )
|
|
|
| def store_result_in_db(result):
|
| try:
|
| connection = mysql.connector.connect(**db_config)
|
| cursor = connection.cursor()
|
| sql = "INSERT INTO kv (mykey, myvalue) VALUES (%s, %s) ON DUPLICATE KEY UPDATE myvalue = VALUES(myvalue)"
|
| cursor.execute(sql, (result['mykey'], result['myvalue']))
|
| connection.commit()
|
| except mysql.connector.Error as err:
|
| print(f"Error: {err}")
|
| finally:
|
| if connection.is_connected():
|
| cursor.close()
|
| connection.close()
|
|
|
| def main():
|
| while True:
|
| message = receive_work_message()
|
| if message:
|
| message_body = json.loads(message['Body'])
|
| result = process_work_message(message_body)
|
| send_result_message(result)
|
| store_result_in_db(result)
|
| # 删除已处置惩罚的消息
|
| sqs_client.delete_message(
|
| QueueUrl=work_queue_url,
|
| ReceiptHandle=message['ReceiptHandle']
|
| )
|
|
|
| if __name__ == '__main__':
|
| main()
|