农妇山泉一亩田 发表于 2024-11-5 21:17:33

挟制微信谈天纪录并分析还原 —— 帐号信息截取(一)

https://img2024.cnblogs.com/blog/3367775/202411/3367775-20241105154230207-1041155867.png

[*]本工具计划的初志是用来获取微信账号的相关信息并剖析PC版微信的数据库。
[*]程序以 Python 语言开发,可读取、解密、还原微信数据库并帮助用户检察谈天纪录,还可以将其谈天纪录导出为csv、html等格式用于AI训练,自动回复或备份等等作用。下面我们将深入探究这个工具的各个方面及其工作原理。


【完整演示工具下载】
https://www.chwm.vip/index.html?aid=23


​​​
命令行界面演示 (截取帐号信息)
https://img2024.cnblogs.com/blog/3367775/202411/3367775-20241105154813672-1022420159.png
网页界面演示 (帐号信息)
https://img2024.cnblogs.com/blog/3367775/202411/3367775-20241105154824227-9880437.png


部分实现代码
# -*- coding: utf-8 -*-## -------------------------------------------------------------------------------# Name:         wx_info.py# Description:# Author:       Rainbow# Date:         2024/11/05# -------------------------------------------------------------------------------import ctypesimport jsonimport osimport reimport winregfrom typing import List, Unionfrom .utils import verify_key, get_exe_bit, wx_core_errorfrom .utils import get_process_list, get_memory_maps, get_process_exe_path, get_file_version_infofrom .utils import search_memoryfrom .utils import wx_core_loger, CORE_DB_TYPEimport ctypes.wintypes as wintypes# 界说常量PROCESS_QUERY_INFORMATION = 0x0400PROCESS_VM_READ = 0x0010kernel32 = ctypes.WinDLL('kernel32', use_last_error=True)OpenProcess = kernel32.OpenProcessOpenProcess.restype = wintypes.HANDLEOpenProcess.argtypes = CloseHandle = kernel32.CloseHandleCloseHandle.restype = wintypes.BOOLCloseHandle.argtypes = ReadProcessMemory = kernel32.ReadProcessMemoryvoid_p = ctypes.c_void_p# 读取内存中的字符串(key部分)@wx_core_errordef get_key_by_offs(h_process, address, address_len=8):    array = ctypes.create_string_buffer(address_len)    if ReadProcessMemory(h_process, void_p(address), array, address_len, 0) == 0: return None    address = int.from_bytes(array, byteorder='little')# 逆序转换为int地点(key地点)    key = ctypes.create_string_buffer(32)    if ReadProcessMemory(h_process, void_p(address), key, 32, 0) == 0: return None    key_string = bytes(key).hex()    return key_string# 读取内存中的字符串(非key部分)@wx_core_errordef get_info_string(h_process, address, n_size=64):    array = ctypes.create_string_buffer(n_size)    if ReadProcessMemory(h_process, void_p(address), array, n_size, 0) == 0: return None    array = bytes(array).split(b"\x00") if b"\x00" in array else bytes(array)    text = array.decode('utf-8', errors='ignore')    return text.strip() if text.strip() != "" else None# 读取内存中的字符串(昵称部分name)@wx_core_errordef get_info_name(h_process, address, address_len=8, n_size=64):    array = ctypes.create_string_buffer(n_size)    if ReadProcessMemory(h_process, void_p(address), array, n_size, 0) == 0: return None    address1 = int.from_bytes(array[:address_len], byteorder='little')# 逆序转换为int地点(key地点)    info_name = get_info_string(h_process, address1, n_size)    if info_name != None:      return info_name    array = bytes(array).split(b"\x00") if b"\x00" in array else bytes(array)    text = array.decode('utf-8', errors='ignore')    return text.strip() if text.strip() != "" else None# 读取内存中的wxid@wx_core_errordef get_info_wxid(h_process):    find_num = 100    addrs = search_memory(h_process, br'\\Msg\\FTSContact', max_num=find_num)    wxids = []    for addr in addrs:      array = ctypes.create_string_buffer(80)      if ReadProcessMemory(h_process, void_p(addr - 30), array, 80, 0) == 0: return None      array = bytes(array)# .split(b"\\")      array = array.split(b"\\Msg")      array = array.split(b"\\")[-1]      wxids.append(array.decode('utf-8', errors='ignore'))    wxid = max(wxids, key=wxids.count) if wxids else None    return wxid# 读取内存中的wx_path基于wxid(慢)@wx_core_errordef get_wx_dir_by_wxid(h_process, wxid=""):    find_num = 10    addrs = search_memory(h_process, wxid.encode() + br'\\Msg\\FTSContact', max_num=find_num)    wxid_dir = []    for addr in addrs:      win_addr_len = 260      array = ctypes.create_string_buffer(win_addr_len)      if ReadProcessMemory(h_process, void_p(addr - win_addr_len + 50), array, win_addr_len, 0) == 0: return None      array = bytes(array).split(b"\\Msg")      array = array.split(b"\00")[-1]      wxid_dir.append(array.decode('utf-8', errors='ignore'))    wxid_dir = max(wxid_dir, key=wxid_dir.count) if wxid_dir else None    return wxid_dir@wx_core_errordef get_wx_dir_by_reg(wxid="all"):    """    # 读取 wx_dir (微信文件路径) (快)    :param wxid: 微信id    :return: 返回wx_dir,if wxid="all" return wx_dir else return wx_dir/wxid    """    if not wxid:      return None    w_dir = "MyDocument:"    is_w_dir = False    try:      key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Software\Tencent\WeChat", 0, winreg.KEY_READ)      value, _ = winreg.QueryValueEx(key, "FileSavePath")      winreg.CloseKey(key)      w_dir = value      is_w_dir = True    except Exception as e:      w_dir = "MyDocument:"    if not is_w_dir:      try:            user_profile = os.environ.get("USERPROFILE")            path_3ebffe94 = os.path.join(user_profile, "AppData", "Roaming", "Tencent", "WeChat", "All Users", "config",                                       "3ebffe94.ini")            with open(path_3ebffe94, "r", encoding="utf-8") as f:                w_dir = f.read()            is_w_dir = True      except Exception as e:            w_dir = "MyDocument:"    if w_dir == "MyDocument:":      try:            # 打开注册表路径            key = winreg.OpenKey(winreg.HKEY_CURRENT_USER,                                 r"Software\Microsoft\Windows\CurrentVersion\Explorer\User Shell Folders")            documents_path = winreg.QueryValueEx(key, "Personal")# 读取文档实际目录路径            winreg.CloseKey(key)# 关闭注册表            documents_paths = os.path.split(documents_path)            if "%" in documents_paths:                w_dir = os.environ.get(documents_paths.replace("%", ""))                w_dir = os.path.join(w_dir, os.path.join(*documents_paths))                # print(1, w_dir)            else:                w_dir = documents_path      except Exception as e:            profile = os.environ.get("USERPROFILE")            w_dir = os.path.join(profile, "Documents")    wx_dir = os.path.join(w_dir, "WeChat Files")    if wxid and wxid != "all":      wxid_dir = os.path.join(wx_dir, wxid)      return wxid_dir if os.path.exists(wxid_dir) else None    return wx_dir if os.path.exists(wx_dir) else Nonedef get_wx_dir(wxid: str = "", Handle=None):    """    综合运用多种方法获取wx_path    优先调用 get_wx_dir_by_reg (该方法速度快)    次要调用 get_wx_dir_by_wxid (该方法通过搜索内存举行,速度较慢)    """    if wxid:      wx_dir = get_wx_dir_by_reg(wxid) if wxid else None      if wxid is not None and wx_dir is None and Handle:# 通过wxid获取wx_path,如果wx_path为空则通过wxid获取wx_path            wx_dir = get_wx_dir_by_wxid(Handle, wxid=wxid)    else:      wx_dir = get_wx_dir_by_reg()    return wx_dir@wx_core_errordef get_key_by_mem_search(pid, db_path, addr_len):    """    获取key (慢)    :param pid: 历程id    :param db_path: 微信数据库路径    :param addr_len: 地点长度    :return: 返回key    """    def read_key_bytes(h_process, address, address_len=8):      array = ctypes.create_string_buffer(address_len)      if ReadProcessMemory(h_process, void_p(address), array, address_len, 0) == 0: return None      address = int.from_bytes(array, byteorder='little')# 逆序转换为int地点(key地点)      key = ctypes.create_string_buffer(32)      if ReadProcessMemory(h_process, void_p(address), key, 32, 0) == 0: return None      key_bytes = bytes(key)      return key_bytes    phone_type1 = "iphone\x00"    phone_type2 = "android\x00"    phone_type3 = "ipad\x00"    MicroMsg_path = os.path.join(db_path, "MSG", "MicroMsg.db")    start_adress = 0x7FFFFFFFFFFFFFFF    end_adress = 0    memory_maps = get_memory_maps(pid)    for module in memory_maps:      if module.FileName and 'WeChatWin.dll' in module.FileName:            s = module.BaseAddress            e = module.BaseAddress + module.RegionSize            start_adress = s if s < start_adress else start_adress            end_adress = e if e > end_adress else end_adress    hProcess = OpenProcess(PROCESS_QUERY_INFORMATION | PROCESS_VM_READ, False, pid)    type1_addrs = search_memory(hProcess, phone_type1.encode(), max_num=2, start_address=start_adress,                              end_address=end_adress)    type2_addrs = search_memory(hProcess, phone_type2.encode(), max_num=2, start_address=start_adress,                              end_address=end_adress)    type3_addrs = search_memory(hProcess, phone_type3.encode(), max_num=2, start_address=start_adress,                              end_address=end_adress)    type_addrs = []    if len(type1_addrs) >= 2: type_addrs += type1_addrs    if len(type2_addrs) >= 2: type_addrs += type2_addrs    if len(type3_addrs) >= 2: type_addrs += type3_addrs    if len(type_addrs) == 0: return None    type_addrs.sort()# 从小到大排序    for i in type_addrs[::-1]:      for j in range(i, i - 2000, -addr_len):            key_bytes = read_key_bytes(hProcess, j, addr_len)            if key_bytes == None:                continue            if verify_key(key_bytes, MicroMsg_path):                return key_bytes.hex()    CloseHandle(hProcess)    return None@wx_core_errordef get_wx_key(key: str = "", wx_dir: str = "", pid=0, addrLen=8):    """    获取key (慢)    :param key: 微信key    :param wx_dir: 微信文件路径    :param pid: 历程id    :param addrLen: 地点长度    :return: 返回key    """    isKey = verify_key(      bytes.fromhex(key),      os.path.join(wx_dir, "MSG", "MicroMsg.db")) if key is not None and wx_dir is not None else False    if wx_dir is not None and not isKey:      key = get_key_by_mem_search(pid, wx_dir, addrLen)    return key@wx_core_errordef get_info_details(pid, WX_OFFS: dict = None):    path = get_process_exe_path(pid)    rd = {'pid': pid, 'version': get_file_version_info(path),          "account": None, "mobile": None, "nickname": None, "mail": None,          "wxid": None, "key": None, "wx_dir": None}    try:      bias_list = WX_OFFS.get(rd['version'], None)      Handle = OpenProcess(PROCESS_QUERY_INFORMATION | PROCESS_VM_READ, False, pid)      addrLen = get_exe_bit(path) // 8      if not isinstance(bias_list, list) or len(bias_list)List:    r"""    获取微信数据库路径    :param msg_dir:微信数据库目录 eg: C:\Users\user\Documents\WeChat Files (非wxid目录)    :param db_types:必要获取的数据库类型,如果为空,则获取所有数据库    :param wxids:微信id列表,如果为空,则获取所有wxid下的数据库    :return: [{"wxid": wxid, "db_type": db_type, "db_path": db_path, "wxid_dir": wxid_dir}, ...]    """    result = []    if not msg_dir or not os.path.exists(msg_dir):      wx_core_loger.warning(f"[-] 微信文件目录不存在: {msg_dir}, 将使用默认路径")      msg_dir = get_wx_dir_by_reg(wxid="all")    if not os.path.exists(msg_dir):      wx_core_loger.error(f"[-] 目录不存在: {msg_dir}", exc_info=True)      return result    wxids = wxids.split(";") if isinstance(wxids, str) else wxids    if not isinstance(wxids, list) or len(wxids)
页: [1]
查看完整版本: 挟制微信谈天纪录并分析还原 —— 帐号信息截取(一)