实例讲解Python 解析JSON实现主机管理

打印 上一主题 下一主题

主题 651|帖子 651|积分 1953

本文分享自华为云社区《Python 解析JSON实现主机管理》,作者: LyShark。
JSON(JavaScript Object Notation)是一种轻量级的数据交换格式,它以易于阅读和编写的文本形式表示数据。JSON 是一种独立于编程语言的数据格式,因此在不同的编程语言中都有对应的解析器和生成器。JSON 格式的设计目标是易于理解、支持复杂数据结构和具有良好的可扩展性。
JSON 数据是以键值对的形式存在的,而且易于阅读和编写。以下是一个简单的 JSON 示例:
  1. {
  2.   "name": "John Doe",
  3.   "age": 30,
  4.   "city": "New York",
  5.   "isStudent": false,
  6.   "grades": [95, 88, 75, 92],
  7.   "address": {
  8.     "street": "123 Main St",
  9.     "zipCode": "10001"
  10.   }
  11. }
复制代码
在这个例子中,JSON 对象包含了一些属性,包括字符串、数字、布尔值、数组和嵌套的对象。

  • "name": "John Doe":字符串键值对。
  • "age": 30:数字键值对。
  • "city": "New York":字符串键值对。
  • "isStudent": false:布尔键值对。
  • "grades": [95, 88, 75, 92]:数组键值对。
  • "address": {...}:嵌套对象。
在实际应用中,JSON 数据通常用于前后端之间的数据交换,或者配置文件的存储。各种编程语言都提供了处理 JSON数据的库或模块。
很早之前大概是两年前,当时为了实现批量管理SSH账号密码并实现自动巡检功能,写过一个简单的命令行工具,通过使用JSON实现对特定主机账号密码与组的管理,如下代码,通过定义AdminDataBase()类,传如数据库文件名database.json实现对特定JSON文件的增删改查功能,在编写该案例后我对JSON的使用变得更加深刻了。
  1. # -*- coding: utf-8 -*-
  2. import os,json
  3. class AdminDataBase(object):
  4.     def __init__(self, database_path):
  5.         self.database_path = database_path
  6.     # 判断数据库文件是否存在,不存则则创建一个.
  7.     def InitDatabase(self):
  8.         if os.path.exists(self.database_path) != None :
  9.             init_database = \
  10.                 {
  11.                     "HostList": [["1000", "127.0.0.1", "username", "password", "22"]],
  12.                     "HostGroup": [{"DefaultGroup": ["1000"]}, ],
  13.                 }
  14.             with open(self.database_path, "w", encoding="utf-8") as fp:
  15.                 fp.write(json.dumps(init_database))
  16.             print("[+] {} 结构已被初始化.".format(self.database_path))
  17.     # table 接收一个列表名,根据列表名输出列表中的数据
  18.     def ShowHostList(self):
  19.         with open(self.database_path, "r", encoding="utf-8") as Read_Pointer:
  20.             load_json = json.loads(Read_Pointer.read())
  21.             base = load_json.get("HostList")
  22.             print("-" * 80)
  23.             print("UUID \t 主机地址 \t\t\t 登录用户 \t\t 登录密码 \t 端口")
  24.             print("-" * 80)
  25.             for each in range(0, len(base)):
  26.                 print("{0:4} \t {1:15} \t {2:10} \t {3:10} \t {4:10}"
  27.                       .format(base[each][0], base[each][1], base[each][2], base[each][3], base[each][4]))
  28.         print()
  29.     # 在原来的基础上添加一台新的主机,添加到HostList表中
  30.     def AddHost(self, uuid, address, username, password, port):
  31.         with open(self.database_path, "r", encoding="utf-8") as Read_Pointer:
  32.             # 读取 database.json 中的数据到内存中
  33.             load_json = json.loads(Read_Pointer.read())
  34.             # 先读入内存,修改后全部替换进去
  35.             host_list = load_json.get("HostList")
  36.             # 获取到最后一次循环的元素列表
  37.             for each in range(len(host_list) - 1, len(host_list)):
  38.                 # 判断如果UUID不重复则添加,否则跳过添加
  39.                 if (host_list[each][0] != uuid):
  40.                     host_list.append([uuid, address, username, password, port])
  41.                     load_json["HostList"] = host_list
  42.                     # 最后再次将改好的数据,回写到文件保存
  43.                     with open(self.database_path, "w", encoding="utf-8") as Write_Pointer:
  44.                         dump_json = json.dumps(load_json)
  45.                         Write_Pointer.write(dump_json)
  46.                         print("[+] UUID {} 添加成功.".format(uuid))
  47.                 else:
  48.                     print("[-] UUID {} 列表中存在,添加失败.".format(uuid))
  49.                     return 0
  50.     # 根据传入UUID号修改特定主机数据
  51.     def ModifyHost(self,uuid,modify_address,modify_username,modify_password,modify_port):
  52.         with open(self.database_path, "r", encoding="utf-8") as Read_Pointer:
  53.             # 读取 database.json 中的数据到内存中
  54.             load_json = json.loads(Read_Pointer.read())
  55.             host_list = load_json.get("HostList")
  56.             # 根据uuid寻找一维数组的下标位置
  57.             index = -1
  58.             for index, value in enumerate(host_list):
  59.                 if value[0] == uuid:
  60.                     print("[*] 已找到UUID {} 所在下标为 {}".format(uuid,index))
  61.                     break
  62.                 else:
  63.                     index = -1
  64.             # 判断是否找到了,找到了则修改
  65.             if index != -1:
  66.                 # 修改指定下标数值,并将修改后的数据放入原始JSON文件中
  67.                 host_list[index] = [uuid,modify_address,modify_username,modify_password,modify_port]
  68.                 load_json["HostList"] = host_list
  69.                 # 最后再次将改好的数据,回写到文件保存
  70.                 with open(self.database_path, "w", encoding="utf-8") as Write_Pointer:
  71.                     dump_json = json.dumps(load_json)
  72.                     Write_Pointer.write(dump_json)
  73.                     print("[+] UUID {} 修改完成.".format(uuid))
  74.                 return 0
  75.             else:
  76.                 print("[-] UUID {} 不存在.".format(uuid))
  77.         return 0
  78.     # 根据传入的UUID号删除主机数据,先删除所对的组中的数据,然后在删除主机数据
  79.     def DeleteHost(self,uuid):
  80.         with open(self.database_path, "r", encoding="utf-8") as Read_Pointer:
  81.             load_json = json.loads(Read_Pointer.read())
  82.             host_group = load_json.get("HostGroup")
  83.             # 首先在HostGroup中寻找并弹出相应UUID
  84.             for each in range(0,len(host_group)):
  85.                 try:
  86.                     # 循环每个字典
  87.                     for k,v in host_group[each].items():
  88.                         # 判断UUID是否存在于每个字典中
  89.                         if v.count(uuid) != 0:
  90.                             # 移除并放入原始JSON中
  91.                             v.remove(uuid)
  92.                             load_json["HostGroup"] = v
  93.                             # 最后再次将改好的数据,回写到文件保存
  94.                             with open(self.database_path, "w", encoding="utf-8") as Write_Pointer:
  95.                                 dump_json = json.dumps(load_json)
  96.                                 Write_Pointer.write(dump_json)
  97.                                 #print("[+] UUID {} 修改完成.".format(uuid))
  98.                         else:
  99.                             #print("[-] 当前组 {} 不能存在: {}".format(k,uuid))
  100.                             break
  101.                 except Exception:
  102.                     print("[-] 当前组不能存在: {}".format(uuid))
  103.                     return 0
  104.             # ----------------------------------------------
  105.             # 其次寻找HostList中的数据并弹出
  106.             host_list = load_json.get("HostList")
  107.             try:
  108.                 # 根据uuid寻找一维数组的下标位置
  109.                 index = -1
  110.                 for index, value in enumerate(host_list):
  111.                     if value[0] == uuid:
  112.                         #print("[*] 已找到UUID {} 所在下标为 {}".format(uuid,index))
  113.                         break
  114.                     else:
  115.                         index = -1
  116.                 # 如果找到了,则直接根据下标弹出数据
  117.                 if index != -1:
  118.                     host_list.pop(index)
  119.                 load_json["HostList"] = host_list
  120.                 # 最后再次将改好的数据,回写到文件保存
  121.                 with open(self.database_path, "w", encoding="utf-8") as Write_Pointer:
  122.                     dump_json = json.dumps(load_json)
  123.                     Write_Pointer.write(dump_json)
  124.                 print("[+] UUID {} 主机: {} 已被移除.".format(uuid,host_list[index][1]))
  125.                 return 0
  126.             except Exception:
  127.                 return 0
  128.     # 向数据库中的HostGroup字段中添加一个主机组
  129.     def AddHostGroup(self,add_group_name):
  130.         with open(self.database_path, "r", encoding="utf-8") as Read_Pointer:
  131.             load_json = json.loads( Read_Pointer.read() )
  132.             group_obj = load_json.get("HostGroup")
  133.             # 循环所有字典中的组
  134.             for each in range(0, len(group_obj)):
  135.                 list_name = str(list(group_obj[each].keys())[0])
  136.                 # print("组节点: {}".format(list_name))
  137.                 # 循环判断表中是否存在指定的组名称
  138.                 if (list_name == add_group_name):
  139.                     print("[-] {} 存在于组中,无法继续添加.".format(list_name))
  140.                     return 0
  141.             # 读取原始JSON文件,并构建回写参数
  142.             tmp = {}
  143.             tmp[add_group_name] = ["1000"]
  144.             group_obj.append(tmp)
  145.             # 回写添加主机组
  146.             with open(self.database_path, "w", encoding="utf-8") as Write_Pointer:
  147.                 dump_json = json.dumps(load_json)
  148.                 Write_Pointer.write(dump_json)
  149.             print("[+] 主机组 {} 已添加".format(add_group_name))
  150.     # 在主机组中删除一个主机组
  151.     def DeleteHostGroup(self,delete_group_name):
  152.         with open(self.database_path, "r", encoding="utf-8") as Read_Pointer:
  153.             load_json = json.loads( Read_Pointer.read() )
  154.             group_obj = load_json.get("HostGroup")
  155.             # 循环所有字典中的组
  156.             for each in range(0, len(group_obj)):
  157.                 list_name = str(list(group_obj[each].keys())[0])
  158.                 # 循环判断表中是否存在指定的组名称
  159.                 if (list_name == delete_group_name):
  160.                     # 如果存在于组中,那我们就把他的each弹出列表
  161.                     group_obj.pop(each)
  162.                     # 将弹出后的列表再次写回JSON序列中
  163.                     with open(self.database_path, "w", encoding="utf-8") as Write_Pointer:
  164.                         dump_json = json.dumps(load_json)
  165.                         Write_Pointer.write(dump_json)
  166.                     print("[-] 主机组 {} 已移除.".format(delete_group_name))
  167.                     return 0
  168.             print("[-] 主机组 {} 不存在.".format(delete_group_name))
  169.         return 0
  170.     # 向指定主机组中添加一个主机UUID
  171.     def AddHostGroupOnUUID(self,group_name, uuid):
  172.         with open(self.database_path, "r", encoding="utf-8") as Read_Pointer:
  173.             load_json = json.loads( Read_Pointer.read() )
  174.             group_obj = load_json.get("HostGroup")
  175.             # 循环所有字典中的所有组
  176.             for each in range(0, len(group_obj)):
  177.                 list_name = str(list(group_obj[each].keys())[0])
  178.                 # 如果找到了需要添加的组
  179.                 if (list_name == group_name):
  180.                     tmp = group_obj[each]
  181.                     # 获取到原始列表中的数据
  182.                     val = list(tmp.values())[0]
  183.                     # 判断输入的UUID号,是否在val列表中,不在则添加
  184.                     if str(uuid) not in val:
  185.                         val.append(str(uuid))
  186.                         # 将原始数据赋值到新的表中
  187.                         group_obj[each][list_name] = val
  188.                     else:
  189.                         print("[-] UUID {} 已存在于 {} 主机组,添加失败.".format(uuid,group_name))
  190.                         return 0
  191.         with open(self.database_path, "w", encoding="utf-8") as Write_Pointer:
  192.             dump_json = json.dumps(load_json)
  193.             Write_Pointer.write(dump_json)
  194.             print("[+] 向主机组 {} 增加UUID {} 完成".format(group_name, uuid))
  195.         return 0
  196.     # 从指定主机组中删除一个指定的UUID号
  197.     def DeleteHostGroupOnUUID(self,group_name, uuid):
  198.         with open(self.database_path, "r", encoding="utf-8") as Read_Pointer:
  199.             load_json =json.loads( Read_Pointer.read() )
  200.             group_obj = load_json.get("HostGroup")
  201.             # 循环所有字典中的组
  202.             for each in range(0, len(group_obj)):
  203.                 list_name = str(list(group_obj[each].keys())[0])
  204.                 # 先来寻找到需要删除的主机组
  205.                 if (list_name == group_name):
  206.                     tmp = group_obj[each]
  207.                     # 寻找指定组中的指定列表
  208.                     val = list(tmp.values())[0]
  209.                     for x in range(0, len(val)):
  210.                         if (val[x] == uuid):
  211.                             print("[*] 搜索UUID: {} 索引值: {}".format(val[x], x))
  212.                             # 将要删除的UUID弹出列表
  213.                             val.pop(x)
  214.                             # 将弹出后的列表重新复制到主机组中
  215.                             group_obj[each][list_name] = val
  216.                             # 保存弹出后的列表到序列中
  217.                             with open(self.database_path, "w", encoding="utf-8") as write_fp:
  218.                                 dump_json = json.dumps(load_json)
  219.                                 write_fp.write(dump_json)
  220.                             print("[+] 从主机组 {} 弹出UUID {} 完成".format(group_name, uuid))
  221.                             return 0
  222.         return 0
  223.     # 输出所有主机组和全部节点信息
  224.     def ShowAllGroup(self):
  225.         with open(self.database_path, "r", encoding="utf-8") as Read_Pointer:
  226.             load_json = json.loads( Read_Pointer.read() )
  227.             group_obj = load_json.get("HostGroup")
  228.             # 循环解析所有组,并解析出UUID所对应的主机地址等信息
  229.             for each in range(0, len(group_obj)):
  230.                 for k, v in group_obj[each].items():
  231.                     print("-" * 140)
  232.                     print("主机组: {}".format(k))
  233.                     print("-" * 140)
  234.                     for each in range(0, len(v)):
  235.                         # print("--> UUID: {}".format(v[each]))
  236.                         # 循环判断,拿着组内UUID判断是否存在,如果存在则打印出主机详细信息
  237.                         base_obj = load_json.get("HostList")
  238.                         for x in range(0, len(base_obj)):
  239.                             if (v[each] == base_obj[x][0]):
  240.                                 print("UUID: {0:6} \t 主机地址: {1:15} \t 主机账号: {2:15} \t 主机密码: {3:15} \t 端口: {4:5} \t".
  241.                                       format(base_obj[x][0], base_obj[x][1], base_obj[x][2], base_obj[x][3],
  242.                                              base_obj[x][4]))
  243.                     print("\n")
  244.     # 只显示所有的主机组,包括组中主机数
  245.     def ShowGroup(self):
  246.         with open(self.database_path, "r", encoding="utf-8") as Read_Pointer:
  247.             load_json = json.loads( Read_Pointer.read() )
  248.             group_obj = load_json.get("HostGroup")
  249.             print("-" * 80)
  250.             print("{0:20} \t {1:5} \t {2:100}".format("主机组名","主机数","主机列表"))
  251.             print("-" * 80)
  252.             for each in range(0,len(group_obj)):
  253.                 for k,v in group_obj[each].items():
  254.                     print("{0:20} \t {1:5} \t {2:100}".format(k,str(len(v)), str(v)))
  255.         print()
  256.         return 0
  257.     # 对指定的主机组进行Ping测试
  258.     def PingGroup(self,group_name):
  259.         with open(self.database_path, "r", encoding="utf-8") as Read_Pointer:
  260.             load_json = json.loads( Read_Pointer.read() )
  261.             group_obj = load_json.get("HostGroup")
  262.             # 循环遍历主机列表
  263.             for each in range(0, len(group_obj)):
  264.                 for k, v in group_obj[each].items():
  265.                     # 寻找主机组,找到后提取出所有的Value
  266.                     if (k == group_name):
  267.                         item_list = v
  268.                         # 再循环,拿着v的值对base进行循环,提取出其中的用户名密码等
  269.                         for val in range(0, len(item_list)):
  270.                             # 得到循环列表:print(item_list[val])
  271.                             base_obj = load_json.get("HostList")
  272.                             # 循环base表中的计数器
  273.                             for base_count in range(0, len(base_obj)):
  274.                                 # 判断base表中是否存在指定UUID,如果存在则输出其值
  275.                                 if (base_obj[base_count][0] == item_list[val]):
  276.                                     print("地址: {} \t 用户名: {} [ok]".format(base_obj[base_count][1],base_obj[base_count][2]))
  277. if __name__ == "__main__":
  278.     db = AdminDataBase("database.json")
  279.     while True:
  280.         try:
  281.             cmd = str(input("[LyShell] # ")).split()
  282.             if (cmd == ""):
  283.                 continue
  284.             elif (cmd[0] == "exit"):
  285.                 exit(1)
  286.             elif (cmd[0] == "clear"):
  287.                 os.system("cls")
  288.             elif(cmd[0] == "Init"):
  289.                 db.InitDatabase()
  290.             elif(cmd[0] == "ShowHostList"):
  291.                 db.ShowHostList()
  292.             elif(cmd[0] == "ShowGroup"):
  293.                 db.ShowGroup()
  294.             elif(cmd[0] == "ShowAllGroup"):
  295.                 db.ShowAllGroup()
  296.             elif(cmd[0] == "AddHost" or cmd[0] == "ModifyHost"):
  297.                 if (len(cmd) - 1 >= 5):
  298.                     uuid = str(cmd[1]).split("=")[1]
  299.                     address = str(cmd[2]).split("=")[1]
  300.                     username = str(cmd[3]).split("=")[1]
  301.                     password = str(cmd[4]).split("=")[1]
  302.                     port = str(cmd[5]).split("=")[1]
  303.                     if cmd[0] == "AddHost":
  304.                         db.AddHost(uuid,address,username,password,port)
  305.                     elif cmd[0] == "ModifyHost":
  306.                         db.ModifyHost(uuid,address,username,password,port)
  307.             elif(cmd[0] == "DeleteHost"):
  308.                 if(len(cmd)-1 >= 1):
  309.                     uuid = str(cmd[1]).split("=")[1]
  310.                     db.DeleteHost(uuid)
  311.             elif(cmd[0] == "AddHostGroup"):
  312.                 group_name = str(cmd[1]).split("=")[1]
  313.                 db.AddHostGroup(group_name)
  314.             elif(cmd[0] == "DeleteHostGroup"):
  315.                 group_name = str(cmd[1]).split("=")[1]
  316.                 db.DeleteHostGroup(group_name)
  317.             elif(cmd[0] == "AddHostGroupOnUUID"):
  318.                 group_name = str(cmd[1]).split("=")[1]
  319.                 uuid = str(cmd[2]).split("=")[1]
  320.                 db.AddHostGroupOnUUID(group_name,uuid)
  321.             elif(cmd[0] == "DelHostGroupOnUUID"):
  322.                 group_name = str(cmd[1]).split("=")[1]
  323.                 uuid = str(cmd[2]).split("=")[1]
  324.                 db.DeleteHostGroupOnUUID(group_name,uuid)
  325.             elif(cmd[0] == "PingGroup"):
  326.                 group_name = str(cmd[1]).split("=")[1]
  327.                 db.PingGroup(group_name)
  328.             elif (cmd[0] == "help"):
  329.                 print("By: LyShark")
  330.             else:
  331.                 print("Not Command")
  332.         except Exception:
  333.             continue
复制代码
使用案例

管理数据库(AdminDataBase)中的主机信息和主机分组信息。用户通过输入命令来执行不同的操作,如初始化数据库、显示主机列表、添加主机、修改主机信息、删除主机等。
以下是代码的主要功能和命令列表:

  • 初始化数据库:Init
  • 显示主机列表:ShowHostList
  • 显示主机分组:ShowGroup
  • 显示所有主机分组:ShowAllGroup
  • 添加主机:AddHost
  • 修改主机信息:ModifyHost
  • 删除主机:DeleteHost
  • 添加主机分组:AddHostGroup
  • 删除主机分组:DeleteHostGroup
  • 将主机添加到指定分组:AddHostGroupOnUUID
  • 从指定分组删除主机:DelHostGroupOnUUID
  • 按组执行 Ping 操作:PingGroup
  • 清屏:clear
  • 退出程序:exit
  • 帮助:help
Init
初次使用需要执行本命令,对数据库文件进行写出,如下所示;

ShowHostList
用于输出当前主机列表信息,如下图所示;

ShowGroup
用于输出当前主机组,如下图所示;

ShowAllGroup
用于输出所有的主机组以及组内的主机详细信息,如下图所示;

AddHost
添加一个新的主机记录,如下图所示;

ModifyHost
修改一个已有的主机记录,此处以UUID作为修改条件,如下图所示;

DeleteHost
根据一个UUID唯一标识,删除一个已存在的主机记录,如下图所示;

AddHostGroup
新增一个组名,默认会携带1000为初始主机,如下图所示;

DeleteHostGroup
删除一整个主机组,如下图所示;

AddHostGroupOnUUID
根据UUID号将特定主机添加到特定组内,如下图所示;

DelHostGroupOnUUID
根据主机组名,删除特定的UUID,如下图所示;

PingGroup
对特定主机组执行Ping功能测试,此处可以扩展,如下图所示;

总结部分

该案例只是用于学习如何灵活运用JSON实现数据的增删改查,其实在实战中意义不大,因为完全可以使用SQLite这类精简数据库,此案例只是本人为了熟悉JSON的增删改查而写的一个Demo工具。
点击关注,第一时间了解华为云新鲜技术~
 

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

石小疯

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表