| CREATE TABLE kv ( |
| mykey VARCHAR(255) NOT NULL, |
| myvalue VARCHAR(255), |
| PRIMARY KEY (mykey) |
| ); |
| #### 4. 设置安全性和权限 |
| - **IAM 脚色和策略**: |
| - 为 `server_ai` 创建一个 IAM 脚色,并附加适当的策略,答应其访问 Amazon SQS 消息队列和 Amazon Aurora MySQL 数据库。 |
| - 确保策略中包罗对 `sqs:SendMessage`、`sqs:ReceiveMessage`、`sqs ![]() |
| |
| - **安全组**: |
| - 检查 Amazon Aurora MySQL 数据库的安全组设置,确保答应 `server_ai` 所在的实例或网络访问数据库。 |
| - 同样,检查 Amazon SQS 消息队列的访问策略,确保 `server_ai` 具有相应的权限。 |
| |
| ### 总结 |
| 通过以上步骤,你可以在亚马逊云中部署一个兼容 MySQL 的数据 |
| CREATE TABLE kv ( |
| mykey VARCHAR(255) NOT NULL, |
| myvalue VARCHAR(255), |
| PRIMARY KEY (mykey) |
| ); |
| import boto3 |
| import mysql.connector |
| import json |
| |
| # 设置 Amazon SQS 客户端 |
| sqs_client = boto3.client('sqs', region_name='your-region') |
| work_queue_url = 'https://sqs.your-region.amazonaws.com/your-account-id/ai-work-queue' |
| result_queue_url = 'https://sqs.your-region.amazonaws.com/your-account-id/ai-result-queue' |
| |
| # 设置 Amazon Aurora MySQL 数据库连接 |
| db_config = { |
| 'user': 'your-username', |
| 'password': 'your-password', |
| 'host': 'your-aurora-endpoint', |
| 'database': 'your-database-name' |
| } |
| |
| def receive_work_message(): |
| response = sqs_client.receive_message( |
| QueueUrl=work_queue_url, |
| MaxNumberOfMessages=1 |
| ) |
| if 'Messages' in response: |
| return response['Messages'][0] |
| return None |
| |
| def process_work_message(message_body): |
| # 在这里实现工作信息的处置惩罚逻辑 |
| # 假设处置惩罚结果为 mykey='key1', myvalue='value1' |
| return {'mykey': 'key1', 'myvalue': 'value1'} |
| |
| def send_result_message(result): |
| sqs_client.send_message( |
| QueueUrl=result_queue_url, |
| MessageBody='Result message', |
| MessageAttributes={ |
| 'mykey': {'DataType': 'String', 'StringValue': result['mykey']}, |
| 'myvalue': {'DataType': 'String', 'StringValue': result['myvalue']} |
| } |
| ) |
| |
| def store_result_in_db(result): |
| try: |
| connection = mysql.connector.connect(**db_config) |
| cursor = connection.cursor() |
| sql = "INSERT INTO kv (mykey, myvalue) VALUES (%s, %s) ON DUPLICATE KEY UPDATE myvalue = VALUES(myvalue)" |
| cursor.execute(sql, (result['mykey'], result['myvalue'])) |
| connection.commit() |
| except mysql.connector.Error as err: |
| print(f"Error: {err}") |
| finally: |
| if connection.is_connected(): |
| cursor.close() |
| connection.close() |
| |
| def main(): |
| while True: |
| message = receive_work_message() |
| if message: |
| message_body = json.loads(message['Body']) |
| result = process_work_message(message_body) |
| send_result_message(result) |
| store_result_in_db(result) |
| # 删除已处置惩罚的消息 |
| sqs_client.delete_message( |
| QueueUrl=work_queue_url, |
| ReceiptHandle=message['ReceiptHandle'] |
| ) |
| |
| if __name__ == '__main__': |
| main() |
欢迎光临 qidao123.com技术社区-IT企服评测·应用市场 (https://dis.qidao123.com/) | Powered by Discuz! X3.4 |