This commit is contained in:
睿 安
2026-01-21 16:48:36 +08:00
commit abba5cb273
246 changed files with 57473 additions and 0 deletions

1
app/util/__init__.py Normal file
View File

@@ -0,0 +1 @@
from .path import get_abs_path

View File

@@ -0,0 +1,333 @@
import html
import xml.etree.ElementTree as ET
import lz4.block
import requests
import re
from urllib.parse import urlparse
from bs4 import BeautifulSoup
from app.util.protocbuf.msg_pb2 import MessageBytesExtra
from ..util.file import get_file
def decompress_CompressContent(data):
"""
解压缩MsgCompressContent内容
:param data:
:return:
"""
if data is None or not isinstance(data, bytes):
return ""
try:
dst = lz4.block.decompress(data, uncompressed_size=len(data) << 10)
decoded_string = dst.decode().replace("\x00", "") # Remove any null characters
except:
print(
"Decompression failed: potentially corrupt input or insufficient buffer size."
)
return ""
return decoded_string
def escape_js_and_html(input_str):
if not input_str:
return ""
# 转义HTML特殊字符
html_escaped = html.escape(input_str, quote=False)
# 手动处理JavaScript转义字符
js_escaped = (
html_escaped.replace("\\", "\\\\")
.replace("'", r"\'")
.replace('"', r"\"")
.replace("\n", r"\n")
.replace("\r", r"\r")
.replace("\t", r"\t")
)
return js_escaped
def parser_reply(data: bytes):
xml_content = decompress_CompressContent(data)
if not xml_content:
return {
"type": 57,
"title": "发生错误",
"refer": {
"type": "1",
"content": "引用错误",
"displayname": "用户名",
},
"is_error": True,
}
try:
root = ET.XML(xml_content)
appmsg = root.find("appmsg")
msg_type = int(appmsg.find("type").text)
title = appmsg.find("title").text
refermsg_content = appmsg.find("refermsg").find("content").text
refermsg_type = int(appmsg.find("refermsg").find("type").text)
refermsg_displayname = appmsg.find("refermsg").find("displayname").text
return {
"type": msg_type,
"title": title,
"refer": None
if refermsg_type != 1
else {
"type": refermsg_type,
"content": refermsg_content.lstrip("\n"),
"displayname": refermsg_displayname,
},
"is_error": False,
}
except:
return {
"type": 57,
"title": "发生错误",
"refer": {
"type": "1",
"content": "引用错误",
"displayname": "用户名",
},
"is_error": True,
}
def music_share(data: bytes):
xml_content = decompress_CompressContent(data)
if not xml_content:
return {"type": 3, "title": "发生错误", "is_error": True}
try:
root = ET.XML(xml_content)
appmsg = root.find("appmsg")
msg_type = int(appmsg.find("type").text)
title = appmsg.find("title").text
if len(title) >= 39:
title = title[:38] + "..."
artist = appmsg.find("des").text
link_url = appmsg.find("url").text # 链接地址
audio_url = get_audio_url(appmsg.find("dataurl").text) # 播放地址
website_name = get_website_name(link_url)
return {
"type": msg_type,
"title": escape_js_and_html(title),
"artist": escape_js_and_html(artist),
"link_url": link_url,
"audio_url": audio_url,
"website_name": escape_js_and_html(website_name),
"is_error": False,
}
except Exception as e:
print(f"Music Share Error: {e}")
return {"type": 3, "title": "发生错误", "is_error": True}
def share_card(bytesExtra, compress_content_):
title, des, url, show_display_name, thumbnail, app_logo = "", "", "", "", "", ""
try:
xml = decompress_CompressContent(compress_content_)
root = ET.XML(xml)
appmsg = root.find("appmsg")
title = appmsg.find("title").text
try:
des = appmsg.find("des").text
except:
des = ""
url = appmsg.find("url").text
appinfo = root.find("appinfo")
show_display_name = appmsg.find("sourcedisplayname")
sourceusername = appmsg.find("sourceusername")
if show_display_name is not None:
show_display_name = show_display_name.text
else:
if appinfo is not None:
show_display_name = appinfo.find("appname").text
msg_bytes = MessageBytesExtra()
msg_bytes.ParseFromString(bytesExtra)
app_logo = ""
thumbnail = ""
for tmp in msg_bytes.message2:
if tmp.field1 == 3:
thumbnail = tmp.field2
thumbnail = "\\".join(thumbnail.split("\\")[1:])
if tmp.field2 == 4:
app_logo = tmp.field2
app_logo = "\\".join(app_logo.split("\\")[1:])
if sourceusername is not None:
from app.DataBase import micro_msg_db # 放上面会导致循环依赖
contact = micro_msg_db.get_contact_by_username(sourceusername.text)
if contact:
app_logo = contact[7]
finally:
return {
"title": escape_js_and_html(title),
"description": escape_js_and_html(des),
"url": escape_js_and_html(url),
"app_name": escape_js_and_html(show_display_name),
"thumbnail": thumbnail,
"app_logo": app_logo,
}
def transfer_decompress(compress_content_):
"""
return dict
feedesc: 钱数str类型包含一个前缀币种符号除人民币¥之外未测试;
pay_memo: 转账备注;
receiver_username: 接受转账人的 wxid; (因为电脑上只有私聊页面会显示收款所以这个字段没有也罢,不要轻易使用,因为可能为空)
paysubtype: int 类型1 为发出转账3 为接受转账4 为退还转账;
"""
feedesc, pay_memo, receiver_username, paysubtype = "", "", "", ""
try:
xml = decompress_CompressContent(compress_content_)
root = ET.XML(xml)
appmsg = root.find("appmsg")
wcpayinfo = appmsg.find("wcpayinfo")
paysubtype = int(wcpayinfo.find("paysubtype").text)
feedesc = wcpayinfo.find("feedesc").text
pay_memo = wcpayinfo.find("pay_memo").text
receiver_username = wcpayinfo.find("receiver_username").text
finally:
return {
"feedesc": feedesc,
"pay_memo": escape_js_and_html(pay_memo),
"receiver_username": receiver_username,
"paysubtype": paysubtype,
}
def call_decompress(is_send, bytes_extra, display_content, str_content): # 音视频通话
"""
return dict
call_type: int 类型0 为视频1为语音; (返回为 2 是未知错误)
display_content: str 类型,页面显示的话;
"""
call_type = 2
call_length = 0
msg_bytes = MessageBytesExtra()
msg_bytes.ParseFromString(bytes_extra)
# message2 字段 1: 发送人wxid; 字段 3: "1"是语音,"0"是视频; 字段 4: 通话时长
for i in msg_bytes.message2:
if i.field1 == 3:
call_type = int(i.field2)
elif i.field1 == 4:
call_length = int(i.field2)
try:
if display_content == "":
if str_content == "11":
h, m, s = (
call_length // 3600,
(call_length % 3600) // 60,
call_length % 60,
)
display_content = f"通话时长 {f'{h:02d}:' if h else ''}{m:02d}:{s:02d}"
else:
display_content = {
"5": ("" if is_send else "对方") + "已取消",
"8": ("对方" if is_send else "") + "已拒绝",
"7": "已在其他设备接听",
"12": "已在其他设备拒绝",
}[str_content]
except KeyError:
display_content = "未知类型,您可以把这条消息对应的微信界面消息反馈给我们"
return {
"call_type": call_type,
"display_content": display_content,
}
def get_website_name(url):
parsed_url = urlparse(url)
domain = f"{parsed_url.scheme}://{parsed_url.netloc}"
website_name = ""
try:
response = requests.get(domain, allow_redirects=False)
if response.status_code == 200:
soup = BeautifulSoup(response.content, "html.parser")
website_name = soup.title.string.strip()
elif response.status_code == 302:
domain = response.headers["Location"]
response = requests.get(domain, allow_redirects=False)
soup = BeautifulSoup(response.content, "html.parser")
website_name = soup.title.string.strip()
else:
response = requests.get(url, allow_redirects=False)
if response.status_code == 200:
soup = BeautifulSoup(response.content, "html.parser")
website_name = soup.title.string.strip()
index = website_name.find("-")
if index != -1: # 如果找到了 "-"
website_name = website_name[index + 1 :].strip()
except Exception as e:
print(f"Get Website Info Error: {e}")
return website_name
def get_audio_url(url):
path = ""
try:
response = requests.get(url, allow_redirects=False)
# 检查响应状态码
if response.status_code == 302:
path = response.headers["Location"]
elif response.status_code == 200:
print("音乐文件已失效,url:" + url)
else:
print("音乐文件地址获取失败,url:" + url + ",状态码" + str(response.status_code))
except Exception as e:
print(f"Get Audio Url Error: {e}")
return path
def file(bytes_extra, compress_content, output_path):
xml_content = decompress_CompressContent(compress_content)
if not xml_content:
return {"type": 6, "title": "发生错误", "is_error": True}
try:
root = ET.XML(xml_content)
appmsg = root.find("appmsg")
msg_type = int(appmsg.find("type").text)
file_name = appmsg.find("title").text
pattern = r'[\\/:*?"<>|\r\n]+'
file_name = re.sub(pattern, "_", file_name)
appattach = appmsg.find("appattach")
file_len = int(appattach.find("totallen").text)
app_name = ""
file_len = format_bytes(file_len)
file_ext = appattach.find("fileext").text
if root.find("appinfo") is not None:
app_info = root.find("appinfo")
app_name = app_info.find("appname").text
if app_name is None:
app_name = ""
file_path = get_file(bytes_extra, file_name, output_path)
return {
"type": msg_type,
"file_name": escape_js_and_html(file_name),
"file_len": file_len,
"file_ext": file_ext,
"file_path": file_path,
"app_name": escape_js_and_html(app_name),
"is_error": False,
}
except Exception as e:
print(f"File Get Info Error: {e}")
return {"type": 6, "title": "发生错误", "is_error": True}
def format_bytes(size):
units = ["B", "KB", "MB", "GB"]
def convert_bytes(size, unit_index):
if size < 1024 or unit_index >= len(units) - 1:
return size, unit_index
return convert_bytes(size / 1024, unit_index + 1)
final_size, final_unit_index = convert_bytes(size, 0)
return f"{final_size:.2f} {units[final_unit_index]}"

318
app/util/emoji.py Normal file
View File

@@ -0,0 +1,318 @@
# -*- coding: utf-8 -*-
"""
emoji.py
!!!声明:
由于表情包并不属于个人,并且其可能具有版权风险,你只有浏览权没有拥有权
另外访问腾讯API可能会给腾讯服务器造成压力
所以禁止任何人以任何方式修改或间接修改该文件,违者后果自负
"""
import os
import re
import traceback
import xml.etree.ElementTree as ET
import sqlite3
import threading
from PyQt5.QtGui import QPixmap
import requests
from app.log import log, logger
lock = threading.Lock()
db_path = "./app/Database/Msg/Emotion.db"
root_path = "./data/emoji/"
if not os.path.exists("./data"):
os.mkdir("./data")
if not os.path.exists(root_path):
os.mkdir(root_path)
@log
def get_image_format(header):
# 定义图片格式的 magic numbers
image_formats = {
b"\xFF\xD8\xFF": "jpeg",
b"\x89\x50\x4E\x47\x0D\x0A\x1A\x0A": "png",
b"\x47\x49\x46": "gif",
b"\x42\x4D": "bmp",
# 添加其他图片格式的 magic numbers
}
# 判断文件的图片格式
for magic_number, image_format in image_formats.items():
if header.startswith(magic_number):
return image_format
# 如果无法识别格式,返回 None
return None
@log
def parser_xml(xml_string):
assert type(xml_string) == str
# Parse the XML string
try:
root = ET.fromstring(xml_string)
except:
res = re.search('<msg>.*</msg>', xml_string)
if res:
xml_string = res.group()
root = ET.fromstring(xml_string.replace("&", "&amp;"))
emoji = root.find("./emoji")
# Accessing attributes of the 'emoji' element
fromusername = emoji.get("fromusername")
tousername = emoji.get("tousername")
md5 = emoji.get("md5")
cdnurl = emoji.get("cdnurl")
encrypturl = emoji.get("encrypturl")
thumburl = emoji.get("thumburl")
externurl = emoji.get("externurl")
androidmd5 = emoji.get("androidmd5")
width = emoji.get("width")
height = emoji.get("height")
return {
"width": width,
"height": height,
"cdnurl": cdnurl,
"thumburl": thumburl if thumburl else cdnurl,
"md5": (md5 if md5 else androidmd5).lower(),
}
def singleton(cls):
_instance = {}
def inner():
if cls not in _instance:
_instance[cls] = cls()
return _instance[cls]
return inner
# 一定要保证只有一个实例对象
@singleton
class Emotion:
def __init__(self):
self.DB = None
self.cursor: sqlite3.Cursor = None
self.open_flag = False
self.init_database()
def init_database(self):
if not self.open_flag:
if os.path.exists(db_path):
self.DB = sqlite3.connect(db_path, check_same_thread=False)
# '''创建游标'''
self.cursor = self.DB.cursor()
self.open_flag = True
if lock.locked():
lock.release()
def get_emoji_url(self, md5: str, thumb: bool) -> str | bytes:
"""供下载用返回可能是url可能是bytes"""
if thumb:
sql = """
select
case
when thumburl is NULL or thumburl = '' then cdnurl
else thumburl
end as selected_url
from CustomEmotion
where md5 = ?
"""
else:
sql = """
select CDNUrl
from CustomEmotion
where md5 = ?
"""
try:
lock.acquire(True)
self.cursor.execute(sql, [md5])
return self.cursor.fetchone()[0]
except:
md5 = md5.upper()
sql = f"""
select {"Thumb" if thumb else "Data"}
from EmotionItem
where md5 = ?
"""
self.cursor.execute(sql, [md5])
res = self.cursor.fetchone()
return res[0] if res else ""
finally:
lock.release()
def get_emoji_URL(self, md5: str, thumb: bool):
"""只管url另外的不管"""
if thumb:
sql = """
select
case
when thumburl is NULL or thumburl = '' then cdnurl
else thumburl
end as selected_url
from CustomEmotion
where md5 = ?
"""
else:
sql = """
select CDNUrl
from CustomEmotion
where md5 = ?
"""
try:
lock.acquire(True)
self.cursor.execute(sql, [md5])
return self.cursor.fetchone()[0]
except:
return ""
finally:
lock.release()
def close(self):
if self.open_flag:
try:
lock.acquire(True)
self.open_flag = False
self.DB.close()
finally:
lock.release()
def __del__(self):
self.close()
@log
def download(url, output_dir, name, thumb=False):
resp = requests.get(url)
byte = resp.content
image_format = get_image_format(byte[:8])
if image_format:
if thumb:
output_path = os.path.join(output_dir, "th_" + name + "." + image_format)
else:
output_path = os.path.join(output_dir, name + "." + image_format)
else:
output_path = os.path.join(output_dir, name)
with open(output_path, "wb") as f:
f.write(resp.content)
return output_path
def get_most_emoji(messages):
dic = {}
for msg in messages:
str_content = msg[7]
emoji_info = parser_xml(str_content)
if emoji_info is None:
continue
md5 = emoji_info["md5"]
if not md5:
continue
try:
dic[md5][0] += 1
except:
dic[md5] = [1, emoji_info]
md5_nums = [(num[0], key, num[1]) for key, num in dic.items()]
md5_nums.sort(key=lambda x: x[0], reverse=True)
if not md5_nums:
return "", 0
md5 = md5_nums[0][1]
num = md5_nums[0][0]
emoji_info = md5_nums[0][2]
url = emoji_info["cdnurl"]
if not url or url == "":
url = Emotion().get_emoji_url(md5, False)
return url, num
def get_emoji(xml_string, thumb=True, output_path=root_path) -> str:
"""供下载用"""
try:
emoji_info = parser_xml(xml_string)
md5 = emoji_info["md5"]
image_format = [".png", ".gif", ".jpeg"]
for f in image_format:
prefix = "th_" if thumb else ""
file_path = os.path.join(output_path, prefix + md5 + f)
if os.path.exists(file_path):
return file_path
url = emoji_info["thumburl"] if thumb else emoji_info["cdnurl"]
if not url or url == "":
url = Emotion().get_emoji_url(md5, thumb)
if type(url) == str and url != "":
print("下载表情包ing:", url)
emoji_path = download(url, output_path, md5, thumb)
return emoji_path
elif type(url) == bytes:
image_format = get_image_format(url[:8])
if image_format:
if thumb:
output_path = os.path.join(
output_path, "th_" + md5 + "." + image_format
)
else:
output_path = os.path.join(output_path, md5 + "." + image_format)
else:
output_path = os.path.join(output_path, md5)
with open(output_path, "wb") as f:
f.write(url)
print("表情包数据库加载", output_path)
return output_path
else:
print("!!!未知表情包数据,信息:", xml_string, emoji_info, url)
output_path = os.path.join(output_path, "404.png")
if not os.path.exists(output_path):
QPixmap(":/icons/icons/404.png").save(output_path)
return output_path
except:
logger.error(traceback.format_exc())
output_path = os.path.join(output_path, "404.png")
if not os.path.exists(output_path):
QPixmap(":/icons/icons/404.png").save(output_path)
return output_path
def get_emoji_path(xml_string, thumb=True, output_path=root_path) -> str:
try:
emoji_info = parser_xml(xml_string)
md5 = emoji_info["md5"]
image_format = [".png", ".gif", ".jpeg"]
for f in image_format:
prefix = "th_" if thumb else ""
file_path = os.path.join(output_path, prefix + md5 + f)
return file_path
except:
logger.error(traceback.format_exc())
output_path = os.path.join(output_path, "404.png")
return output_path
def get_emoji_url(xml_string, thumb=True) -> str:
"""不管下载只返回url"""
try:
emoji_info = parser_xml(xml_string)
md5 = emoji_info["md5"]
url = emoji_info["thumburl" if thumb else "cdnurl"]
if not url or url == "":
url = Emotion().get_emoji_URL(md5=md5, thumb=thumb)
return url
except:
logger.error(traceback.format_exc())
output_path = os.path.join("./emoji/404.png")
return output_path
if __name__ == "__main__":
# xml_string = '<msg><emoji fromusername = "wxid_0o18ef858vnu22" tousername = "wxid_27hqbq7vx5hf22" type="2" idbuffer="media:0_0" md5="71ce49ed3ce9e57e43e07f802983bf45" len = "352588" productid="com.tencent.xin.emoticon.person.stiker_1678703862259eb01f2ef4a313" androidmd5="71ce49ed3ce9e57e43e07f802983bf45" androidlen="352588" s60v3md5 = "71ce49ed3ce9e57e43e07f802983bf45" s60v3len="352588" s60v5md5 = "71ce49ed3ce9e57e43e07f802983bf45" s60v5len="352588" cdnurl = "http://wxapp.tc.qq.com/262/20304/stodownload?m=71ce49ed3ce9e57e43e07f802983bf45&amp;filekey=30350201010421301f020201060402535a041071ce49ed3ce9e57e43e07f802983bf45020305614c040d00000004627466730000000132&amp;hy=SZ&amp;storeid=263ffa00b000720d03274c5820000010600004f50535a1ca0c950b64287022&amp;bizid=1023" designerid = "" thumburl = "http://mmbiz.qpic.cn/mmemoticon/ajNVdqHZLLDSKTMRgM8agiadpFhKz9IJ3cD5Ra2sTROibOaShdt3D4z6PfE92WkjQY/0" encrypturl = "http://wxapp.tc.qq.com/262/20304/stodownload?m=cbaae1d847aac6389652b65562bacaa2&amp;filekey=30350201010421301f020201060402535a0410cbaae1d847aac6389652b65562bacaa20203056150040d00000004627466730000000132&amp;hy=SZ&amp;storeid=263ffa00b0008d8223274c5820000010600004f50535a17b82910b64764739&amp;bizid=1023" aeskey= "7051ab2a34442dec63434832463f45ce" externurl = "http://wxapp.tc.qq.com/262/20304/stodownload?m=960f68693454dfa64b9966ca5d70dbd3&amp;filekey=30340201010420301e020201060402535a0410960f68693454dfa64b9966ca5d70dbd3020221a0040d00000004627466730000000132&amp;hy=SZ&amp;storeid=26423dbe3000793a8720e40de0000010600004f50535a1d40c950b71be0a50&amp;bizid=1023" externmd5 = "41895664fc5a77878e2155fc96209a19" width= "240" height= "240" tpurl= "" tpauthkey= "" attachedtext= "" attachedtextcolor= "" lensid= "" emojiattr= "" linkid= "" desc= "ChEKB2RlZmF1bHQSBuWNlee6rw==" ></emoji> </msg>'
# res1 = parser_xml(xml_string)
# print(res1, res1['md5'])
# download(res1['cdnurl'], "./data/emoji/", res1['md5'])
# download(res1['thumburl'], "./data/emoji/", res1['md5'], True)
# print(Emotion().get_emoji_url("144714f65c98844128ac3a1042445d9a", True))
# print(Emotion().get_emoji_url("144714f65c98844128ac3a1042445d9a", False))
print(parser_xml(""))
# print(get_emoji(xml_string, True))
# print(get_emoji(xml_string, False))
# http://vweixinf.tc.qq.com/110/20403/stodownload?m=3a4d439aba02dce4834b2c54e9f15597&filekey=3043020101042f302d02016e0402534804203361346434333961626130326463653438333462326335346539663135353937020213f0040d00000004627466730000000131&hy=SH&storeid=323032313037323030373236313130303039653236646365316535316534383236386234306230303030303036653033303034666233&ef=3&bizid=1022

View File

View File

@@ -0,0 +1,176 @@
import csv
import html
import os
import shutil
import sys
import filecmp
from PyQt5.QtCore import pyqtSignal, QThread
from app.config import OUTPUT_DIR
from app.person import Me, Contact
os.makedirs(os.path.join(OUTPUT_DIR, '聊天记录'), exist_ok=True)
def set_global_font(doc, font_name):
# 创建一个新样式
style = doc.styles['Normal']
# 设置字体名称
style.font.name = font_name
# 遍历文档中的所有段落,将样式应用到每个段落
for paragraph in doc.paragraphs:
for run in paragraph.runs:
run.font.name = font_name
def makedirs(path):
os.makedirs(path, exist_ok=True)
os.makedirs(os.path.join(path, 'image'), exist_ok=True)
os.makedirs(os.path.join(path, 'emoji'), exist_ok=True)
os.makedirs(os.path.join(path, 'video'), exist_ok=True)
os.makedirs(os.path.join(path, 'voice'), exist_ok=True)
os.makedirs(os.path.join(path, 'file'), exist_ok=True)
os.makedirs(os.path.join(path, 'avatar'), exist_ok=True)
os.makedirs(os.path.join(path, 'music'), exist_ok=True)
os.makedirs(os.path.join(path, 'icon'), exist_ok=True)
resource_dir = os.path.join('app', 'resources', 'data', 'icons')
if not os.path.exists(resource_dir):
# 获取打包后的资源目录
resource_dir = getattr(sys, '_MEIPASS', os.path.abspath(os.path.dirname(__file__)))
# 构建 FFmpeg 可执行文件的路径
resource_dir = os.path.join(resource_dir, 'app', 'resources', 'data', 'icons')
target_folder = os.path.join(path, 'icon')
# 拷贝一些必备的图标
for root, dirs, files in os.walk(resource_dir):
relative_path = os.path.relpath(root, resource_dir)
target_path = os.path.join(target_folder, relative_path)
# 遍历文件夹中的文件
for file in files:
source_file_path = os.path.join(root, file)
target_file_path = os.path.join(target_path, file)
if not os.path.exists(target_file_path):
shutil.copy(source_file_path, target_file_path)
else:
# 比较文件内容
if not filecmp.cmp(source_file_path, target_file_path, shallow=False):
# 文件内容不一致,进行覆盖拷贝
shutil.copy(source_file_path, target_file_path)
def escape_js_and_html(input_str):
if not input_str:
return ''
# 转义HTML特殊字符
html_escaped = html.escape(input_str, quote=False)
# 手动处理JavaScript转义字符
js_escaped = (
html_escaped
.replace("\\", "\\\\")
.replace("'", r"\'")
.replace('"', r'\"')
.replace("\n", r'\n')
.replace("\r", r'\r')
.replace("\t", r'\t')
)
return js_escaped
class ExporterBase(QThread):
progressSignal = pyqtSignal(int)
rangeSignal = pyqtSignal(int)
okSignal = pyqtSignal(int)
i = 1
CSV = 0
DOCX = 1
HTML = 2
CSV_ALL = 3
CONTACT_CSV = 4
TXT = 5
def __init__(self, contact, type_=DOCX, message_types={}, time_range=None, messages=None,index=0, parent=None):
super().__init__(parent)
self.message_types = message_types # 导出的消息类型
self.contact: Contact = contact # 联系人
self.output_type = type_ # 导出文件类型
self.total_num = 1 # 总的消息数量
self.num = 0 # 当前处理的消息数量
self.index = index #
self.last_timestamp = 0
self.time_range = time_range
self.messages = messages
self.origin_path = os.path.join(os.getcwd(), OUTPUT_DIR, '聊天记录', self.contact.remark)
makedirs(self.origin_path)
def run(self):
self.export()
def export(self):
raise NotImplementedError("export method must be implemented in subclasses")
def cancel(self):
self.requestInterruption()
def is_5_min(self, timestamp) -> bool:
if abs(timestamp - self.last_timestamp) > 300:
self.last_timestamp = timestamp
return True
return False
def get_avatar_path(self, is_send, message, is_absolute_path=False) -> str:
if is_absolute_path:
if self.contact.is_chatroom:
avatar = message[13].avatar_path
else:
avatar = Me().avatar_path if is_send else self.contact.avatar_path
else:
if self.contact.is_chatroom:
avatar = message[13].smallHeadImgUrl
else:
avatar = Me().smallHeadImgUrl if is_send else self.contact.smallHeadImgUrl
return avatar
def get_display_name(self, is_send, message) -> str:
if self.contact.is_chatroom:
if is_send:
display_name = Me().name
else:
display_name = message[13].remark
else:
display_name = Me().name if is_send else self.contact.remark
return escape_js_and_html(display_name)
def text(self, doc, message):
return
def image(self, doc, message):
return
def audio(self, doc, message):
return
def emoji(self, doc, message):
return
def file(self, doc, message):
return
def refermsg(self, doc, message):
return
def system_msg(self, doc, message):
return
def video(self, doc, message):
return
def music_share(self, doc, message):
return
def share_card(self, doc, message):
return

View File

@@ -0,0 +1,96 @@
import os
import re
from app.DataBase import msg_db
from app.util.compress_content import parser_reply, share_card
from app.util.exporter.exporter import ExporterBase
def remove_privacy_info(text):
# 正则表达式模式
patterns = {
'phone': r'\b(\+?86[-\s]?)?1[3-9]\d{9}\b', # 手机号
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', # 邮箱
'id_card': r'\b\d{15}|\d{18}|\d{17}X\b', # 身份证号
'password': r'\b(?:password|pwd|pass|psw)[\s=:]*\S+\b', # 密码
'account': r'\b(?:account|username|user|acct)[\s=:]*\S+\b' # 账号
}
for key, pattern in patterns.items():
text = re.sub(pattern, f'[{key} xxx]', text)
return text
class AiTxtExporter(ExporterBase):
last_is_send = -1
def title(self, message):
str_time = message[8]
is_send = message[4]
display_name = ''
if is_send != self.last_is_send:
display_name = '\n' + self.get_display_name(is_send, message) + ':'
self.last_is_send = is_send
return display_name
def text(self, doc, message):
str_content = remove_privacy_info(message[7])
doc.write(
f'''{self.title(message)}{str_content} '''
)
def image(self, doc, message):
doc.write(
f'''{self.title(message)}[图片]'''
)
def audio(self, doc, message):
doc.write(
f'''{self.title(message)}[语音]'''
)
def emoji(self, doc, message):
doc.write(
f'''{self.title(message)}[表情包]'''
)
def file(self, doc, message):
doc.write(
f'''{self.title(message)}[文件]'''
)
def system_msg(self, doc, message):
str_content = message[7]
str_time = message[8]
str_content = str_content.replace('<![CDATA[', "").replace(
' <a href="weixin://revoke_edit_click">重新编辑</a>]]>', "")
doc.write(
f'''{str_time} {str_content}'''
)
def video(self, doc, message):
is_send = message[4]
doc.write(
f'''{self.title(message)}[视频]'''
)
def export(self):
# 实现导出为txt的逻辑
print(f"【开始导出 TXT {self.contact.remark}")
origin_path = self.origin_path
os.makedirs(origin_path, exist_ok=True)
filename = os.path.join(origin_path, self.contact.remark + '_chat.txt')
messages = msg_db.get_messages_group_by_day(self.contact.wxid, time_range=self.time_range)
total_steps = len(messages)
with open(filename, mode='w', newline='', encoding='utf-8') as f:
for date, messages in messages.items():
f.write(f"\n\n{'*' * 20}{date}{'*' * 20}\n")
for index, message in enumerate(messages):
type_ = message[2]
sub_type = message[3]
self.progressSignal.emit(int((index + 1) / total_steps * 100))
if type_ == 1 and self.message_types.get(type_):
self.text(f, message)
print(f"【完成导出 TXT {self.contact.remark}")
self.okSignal.emit(1)

View File

@@ -0,0 +1,40 @@
import csv
import os
from app.DataBase import msg_db
from app.person import Me
from app.util.exporter.exporter import ExporterBase
from app.config import OUTPUT_DIR
class CSVExporter(ExporterBase):
def to_csv(self):
print(f"【开始导出 CSV {self.contact.remark}")
origin_path = os.path.join(os.getcwd(), OUTPUT_DIR, '聊天记录', self.contact.remark)
os.makedirs(origin_path, exist_ok=True)
filename = os.path.join(origin_path,f"{self.contact.remark}_utf8.csv")
columns = ['localId', 'TalkerId', 'Type', 'SubType',
'IsSender', 'CreateTime', 'Status', 'StrContent',
'StrTime', 'Remark', 'NickName', 'Sender']
messages = msg_db.get_messages(self.contact.wxid, time_range=self.time_range)
# 写入CSV文件
with open(filename, mode='w', newline='', encoding='utf-8-sig') as file:
writer = csv.writer(file)
writer.writerow(columns)
# 写入数据
# writer.writerows(messages)
for msg in messages:
if self.contact.is_chatroom:
other_data = [msg[13].remark, msg[13].nickName, msg[13].wxid]
else:
is_send = msg[4]
Remark = Me().remark if is_send else self.contact.remark
nickname = Me().nickName if is_send else self.contact.nickName
wxid = Me().wxid if is_send else self.contact.wxid
other_data = [Remark,nickname,wxid]
writer.writerow([*msg[:9], *other_data])
print(f"【完成导出 CSV {self.contact.remark}")
self.okSignal.emit(1)
def run(self):
self.to_csv()

View File

@@ -0,0 +1,380 @@
import os
import shutil
import time
from re import findall
import docx
from docx import shared
from docx.enum.table import WD_ALIGN_VERTICAL
from docx.enum.text import WD_COLOR_INDEX, WD_PARAGRAPH_ALIGNMENT
from docx.oxml.ns import qn
from docxcompose.composer import Composer
from app.DataBase import msg_db, hard_link_db
from app.util.exporter.exporter import ExporterBase, escape_js_and_html
from app.config import OUTPUT_DIR
from app.log import logger
from app.person import Me
from app.util.compress_content import parser_reply, share_card, music_share
from app.util.image import get_image_abs_path
from app.util.music import get_music_path
# 要删除的编码字符
encoded_chars = b'\x00\x01\x02\x03\x04\x05\x06\x07\x08\x0b\x0c\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f'
# 创建一个字典,将要删除的字符映射为 None
char_mapping = {char: None for char in encoded_chars}
def filter_control_characters(input_string):
"""
过滤掉不可打印字符
@param input_string:
@return:
"""
# 过滤掉非可打印字符
filtered_string = input_string.translate(char_mapping)
return filtered_string
class DocxExporter(ExporterBase):
def text(self, doc, message):
type_ = message[2]
str_content = message[7]
str_time = message[8]
is_send = message[4]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
display_name = self.get_display_name(is_send, message)
avatar = self.get_avatar_path(is_send, message, True)
content_cell = self.create_table(doc, is_send, avatar)
try:
content_cell.paragraphs[0].add_run(str_content)
except ValueError:
try:
str_content = filter_control_characters(str_content)
content_cell.paragraphs[0].add_run(str_content)
except ValueError:
logger.error(f'非法字符:{str_content}')
content_cell.paragraphs[0].add_run('非法字符')
content_cell.paragraphs[0].font_size = shared.Inches(0.5)
# doc.add_picture(avatar)
if is_send:
p = content_cell.paragraphs[0]
p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
doc.add_paragraph()
def image(self, doc, message):
str_content = message[7]
is_send = message[4]
BytesExtra = message[10]
avatar = self.get_avatar_path(is_send, message, True)
content = self.create_table(doc, is_send, avatar)
run = content.paragraphs[0].add_run()
str_content = escape_js_and_html(str_content)
image_path = hard_link_db.get_image(str_content, BytesExtra, thumb=True)
base_path = os.path.join(OUTPUT_DIR, '聊天记录', self.contact.remark, 'image')
if not os.path.exists(os.path.join(Me().wx_dir, image_path)):
image_thumb_path = hard_link_db.get_image(str_content, BytesExtra, thumb=False)
if not os.path.exists(os.path.join(Me().wx_dir, image_thumb_path)):
return
image_path = image_thumb_path
image_path = get_image_abs_path(image_path, base_path=base_path)
try:
run.add_picture(image_path, height=shared.Inches(2))
doc.add_paragraph()
except Exception:
print("Error!image")
def audio(self, doc, message):
str_content = message[7]
str_time = message[8]
is_send = message[4]
msgSvrId = message[9]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
avatar = self.get_avatar_path(is_send, message, True)
content_cell = self.create_table(doc, is_send, avatar)
content_cell.paragraphs[0].add_run('【语音】')
content_cell.paragraphs[0].font_size = shared.Inches(0.5)
if is_send:
p = content_cell.paragraphs[0]
p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
doc.add_paragraph()
def emoji(self, doc, message):
str_content = message[7]
str_time = message[8]
is_send = message[4]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
avatar = self.get_avatar_path(is_send, message, True)
content_cell = self.create_table(doc, is_send, avatar)
content_cell.paragraphs[0].add_run('【表情包】')
content_cell.paragraphs[0].font_size = shared.Inches(0.5)
if is_send:
p = content_cell.paragraphs[0]
p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
doc.add_paragraph()
def file(self, doc, message):
bytesExtra = message[10]
str_time = message[8]
is_send = message[4]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
avatar = self.get_avatar_path(is_send, message, True)
content_cell = self.create_table(doc, is_send, avatar)
content_cell.paragraphs[0].add_run('【文件】')
content_cell.paragraphs[0].font_size = shared.Inches(0.5)
if is_send:
p = content_cell.paragraphs[0]
p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
doc.add_paragraph()
def refermsg(self, doc, message):
"""
处理回复消息
@param doc:
@param message:
@return:
"""
str_time = message[8]
is_send = message[4]
content = parser_reply(message[11])
refer_msg = content.get('refer')
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
avatar = self.get_avatar_path(is_send, message, True)
content_cell = self.create_table(doc, is_send, avatar)
content_cell.paragraphs[0].add_run(content.get('title'))
content_cell.paragraphs[0].font_size = shared.Inches(0.5)
reply_p = content_cell.add_paragraph()
reply_content = f"{refer_msg.get('displayname')}:{refer_msg.get('content')}" if refer_msg else '未知引用'
run = content_cell.paragraphs[1].add_run(reply_content)
'''设置被回复内容格式'''
run.font.color.rgb = shared.RGBColor(121, 121, 121)
run.font_size = shared.Inches(0.3)
run.font.highlight_color = WD_COLOR_INDEX.GRAY_25
if is_send:
p = content_cell.paragraphs[0]
p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
reply_p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
doc.add_paragraph()
def system_msg(self, doc, message):
str_content = message[7]
is_send = message[4]
str_time = message[8]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
str_content = str_content.replace('<![CDATA[', "").replace(
' <a href="weixin://revoke_edit_click">重新编辑</a>]]>', "")
res = findall('(</{0,1}(img|revo|_wc_cus|a).*?>)', str_content)
for xmlstr, b in res:
str_content = str_content.replace(xmlstr, "")
doc.add_paragraph(str_content).alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
def video(self, doc, message):
type_ = message[2]
str_content = message[7]
str_time = message[8]
is_send = message[4]
BytesExtra = message[10]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
avatar = self.get_avatar_path(is_send, message, True)
content_cell = self.create_table(doc, is_send, avatar)
content_cell.paragraphs[0].add_run('【视频】')
content_cell.paragraphs[0].font_size = shared.Inches(0.5)
if is_send:
p = content_cell.paragraphs[0]
p.paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
doc.add_paragraph()
def create_table(self, doc, is_send, avatar_path):
'''
#! 创建一个1*2表格
#! isSend = 1 (0,0)存聊天内容,(0,1)存头像
#! isSend = 0 (0,0)存头像,(0,1)存聊天内容
#! 返回聊天内容的坐标
'''
table = doc.add_table(rows=1, cols=2, style='Normal Table')
table.cell(0, 1).height = shared.Inches(0.5)
table.cell(0, 0).height = shared.Inches(0.5)
if is_send:
'''表格右对齐'''
table.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
avatar = table.cell(0, 1).paragraphs[0].add_run()
'''插入头像,设置头像宽度'''
avatar.add_picture(avatar_path, width=shared.Inches(0.5))
'''设置单元格宽度跟头像一致'''
table.cell(0, 1).width = shared.Inches(0.5)
content_cell = table.cell(0, 0)
'''聊天内容右对齐'''
content_cell.paragraphs[0].paragraph_format.alignment = WD_PARAGRAPH_ALIGNMENT.RIGHT
else:
avatar = table.cell(0, 0).paragraphs[0].add_run()
avatar.add_picture(avatar_path, width=shared.Inches(0.5))
'''设置单元格宽度'''
table.cell(0, 0).width = shared.Inches(0.5)
content_cell = table.cell(0, 1)
'''聊天内容垂直居中对齐'''
content_cell.vertical_alignment = WD_ALIGN_VERTICAL.CENTER
return content_cell
def music_share(self, doc, message):
origin_path = os.path.join(os.getcwd(), OUTPUT_DIR, '聊天记录', self.contact.remark)
is_send = message[4]
timestamp = message[5]
content = music_share(message[11])
music_path = ''
if content.get('audio_url') != '':
music_path = get_music_path(content.get('audio_url'), content.get('title'),
output_path=origin_path + '/music')
if music_path != '':
music_path = f'./music/{os.path.basename(music_path)}'
music_path = music_path.replace('\\', '/')
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
def share_card(self, doc, message):
origin_path = f"{os.getcwd()}/data/聊天记录/{self.contact.remark}"
is_send = message[4]
timestamp = message[5]
bytesExtra = message[10]
compress_content_ = message[11]
card_data = share_card(bytesExtra, compress_content_)
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
thumbnail = ''
if card_data.get('thumbnail'):
thumbnail = os.path.join(Me().wx_dir, card_data.get('thumbnail'))
if os.path.exists(thumbnail):
shutil.copy(thumbnail, os.path.join(origin_path, 'image', os.path.basename(thumbnail)))
thumbnail = './image/' + os.path.basename(thumbnail)
else:
thumbnail = ''
app_logo = ''
if card_data.get('app_logo'):
app_logo = os.path.join(Me().wx_dir, card_data.get('app_logo'))
if os.path.exists(app_logo):
shutil.copy(app_logo, os.path.join(origin_path, 'image', os.path.basename(app_logo)))
app_logo = './image/' + os.path.basename(app_logo)
else:
app_logo = ''
def merge_docx(self, conRemark, n):
origin_path = os.path.join(os.getcwd(), OUTPUT_DIR, '聊天记录')
all_file_path = []
for i in range(n):
file_name = f"{conRemark}{i}.docx"
all_file_path.append(origin_path + '/' + file_name)
filename = f"{conRemark}.docx"
# print(all_file_path)
doc = docx.Document()
doc.save(origin_path + '/' + filename)
master = docx.Document(origin_path + '/' + filename)
middle_new_docx = Composer(master)
num = 0
for word in all_file_path:
word_document = docx.Document(word)
word_document.add_page_break()
if num != 0:
middle_new_docx.append(word_document)
num = num + 1
os.remove(word)
middle_new_docx.save(origin_path + '/' + filename)
def export(self):
print(f"【开始导出 DOCX {self.contact.remark}")
origin_path = os.path.join(os.getcwd(), OUTPUT_DIR, '聊天记录', self.contact.remark)
messages = msg_db.get_messages(self.contact.wxid, time_range=self.time_range)
Me().save_avatar(os.path.join(origin_path, 'avatar', f'{Me().wxid}.png'))
if self.contact.is_chatroom:
for message in messages:
if message[4]: # is_send
continue
try:
chatroom_avatar_path =os.path.join(origin_path, 'avatar', f'{message[13].wxid}.png')
message[13].save_avatar(chatroom_avatar_path)
except:
print(message)
pass
else:
self.contact.save_avatar(os.path.join(origin_path, 'avatar', f'{self.contact.wxid}.png'))
self.rangeSignal.emit(len(messages))
def newdoc():
nonlocal n, doc
doc = docx.Document()
doc.styles["Normal"].font.name = "Cambria"
doc.styles["Normal"]._element.rPr.rFonts.set(qn("w:eastAsia"), "宋体")
n += 1
doc = None
n = 0
index = 0
newdoc()
for index, message in enumerate(messages):
if index % 200 == 0 and index:
filename = os.path.join(origin_path, f"{self.contact.remark}_{n}.docx")
doc.save(filename)
self.okSignal.emit(n)
newdoc()
type_ = message[2]
sub_type = message[3]
timestamp = message[5]
self.progressSignal.emit(1)
if self.is_5_min(timestamp):
str_time = message[8]
doc.add_paragraph(str_time).alignment = WD_PARAGRAPH_ALIGNMENT.CENTER
if type_ == 1 and self.message_types.get(type_):
self.text(doc, message)
elif type_ == 3 and self.message_types.get(type_):
self.image(doc, message)
elif type_ == 34 and self.message_types.get(type_):
self.audio(doc, message)
elif type_ == 43 and self.message_types.get(type_):
self.video(doc, message)
elif type_ == 47 and self.message_types.get(type_):
self.emoji(doc, message)
elif type_ == 10000 and self.message_types.get(type_):
self.system_msg(doc, message)
elif type_ == 49 and sub_type == 57 and self.message_types.get(1):
self.refermsg(doc, message)
elif type_ == 49 and sub_type == 6 and self.message_types.get(4906):
self.file(doc, message)
if index % 25 == 0:
print(f"【导出 DOCX {self.contact.remark}{index}/{len(messages)}")
if index % 25:
print(f"【导出 DOCX {self.contact.remark}{index + 1}/{len(messages)}")
filename = os.path.join(origin_path, f"{self.contact.remark}_{n}.docx")
try:
# document.save(filename)
doc.save(filename)
except PermissionError:
filename = filename[:-5] + f'{time.time()}' + '.docx'
# document.save(filename)
doc.save(filename)
self.okSignal.emit(n)
print(f"【完成导出 DOCX {self.contact.remark}")
self.okSignal.emit(10086)

View File

@@ -0,0 +1,523 @@
import os
import shutil
import sys
import traceback
from re import findall
from PyQt5.QtCore import pyqtSignal, QThread
from app.DataBase import msg_db, hard_link_db, media_msg_db
from app.util.exporter.exporter import ExporterBase, escape_js_and_html
from app.config import OUTPUT_DIR
from app.log import logger
from app.person import Me
from app.util import path
from app.util.compress_content import parser_reply, share_card, music_share, file, transfer_decompress, call_decompress
from app.util.emoji import get_emoji_url
from app.util.image import get_image_path, get_image
from app.util.music import get_music_path
icon_files = {
'./icon/word.png': ['doc', 'docx'],
'./icon/excel.png': ['xls', 'xlsx'],
'./icon/csv.png': ['csv'],
'./icon/txt.png': ['txt'],
'./icon/zip.png': ['zip', '7z', 'rar'],
'./icon/ppt.png': ['ppt', 'pptx'],
'./icon/pdf.png': ['pdf'],
}
class HtmlExporter(ExporterBase):
def text(self, doc, message):
type_ = message[2]
str_content = message[7]
str_time = message[8]
is_send = message[4]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
display_name = self.get_display_name(is_send, message)
avatar = self.get_avatar_path(is_send, message)
str_content = escape_js_and_html(str_content)
doc.write(
f'''{{ type:{1}, text: '{str_content}',is_send:{is_send},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},'''
)
def image(self, doc, message):
base_path = os.path.join(OUTPUT_DIR, '聊天记录', self.contact.remark, 'image')
type_ = message[2]
str_content = message[7]
str_time = message[8]
is_send = message[4]
BytesExtra = message[10]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
str_content = escape_js_and_html(str_content)
image_path = hard_link_db.get_image(str_content, BytesExtra, up_dir=Me().wx_dir, thumb=False)
image_path = get_image_path(image_path, base_path=base_path)
doc.write(
f'''{{ type:{type_}, text: '{image_path}',is_send:{is_send},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},'''
)
def audio(self, doc, message):
origin_path = os.path.join(os.getcwd(), OUTPUT_DIR, '聊天记录', self.contact.remark)
str_content = message[7]
str_time = message[8]
is_send = message[4]
msgSvrId = message[9]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
try:
audio_path = media_msg_db.get_audio_path(msgSvrId, output_path=origin_path + "/voice")
audio_path = "./voice/" + os.path.basename(audio_path)
except:
logger.error(traceback.format_exc())
return
voice_to_text = media_msg_db.get_audio_text(str_content)
if voice_to_text and voice_to_text != "":
voice_to_text = escape_js_and_html(voice_to_text)
doc.write(
f'''{{ type:34, text:'{audio_path}',is_send:{is_send},avatar_path:'{avatar}',voice_to_text:'{voice_to_text}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},'''
)
def emoji(self, doc, message):
str_content = message[7]
str_time = message[8]
is_send = message[4]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
emoji_path = get_emoji_url(str_content, thumb=True)
doc.write(
f'''{{ type:{3}, text: '{emoji_path}',is_send:{is_send},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},'''
)
def file(self, doc, message):
origin_path = os.path.join(os.getcwd(), OUTPUT_DIR, '聊天记录', self.contact.remark)
bytesExtra = message[10]
compress_content = message[11]
str_time = message[8]
is_send = message[4]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
file_info = file(bytesExtra, compress_content, output_path=origin_path + '/file')
if file_info.get('is_error') == False:
icon_path = None
for icon, extensions in icon_files.items():
if file_info.get('file_ext') in extensions:
icon_path = icon
break
# 如果没有与文件后缀匹配的图标,则使用默认图标
if icon_path is None:
default_icon = './icon/file.png'
icon_path = default_icon
file_path = file_info.get('file_path')
if file_path != "":
file_path = './file/' + file_info.get('file_name')
doc.write(
f'''{{ type:49, text: '{file_path}',is_send:{is_send},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}',icon_path: '{icon_path}',sub_type:6,file_name: '{file_info.get('file_name')}',file_size: '{file_info.get('file_len')}',app_name: '{file_info.get('app_name')}'}},'''
)
def refermsg(self, doc, message):
"""
处理回复消息
@param doc:
@param message:
@return:
"""
str_time = message[8]
is_send = message[4]
content = parser_reply(message[11])
refer_msg = content.get('refer')
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
contentText = escape_js_and_html(content.get('title'))
if refer_msg:
referText = f"{escape_js_and_html(refer_msg.get('displayname'))}{escape_js_and_html(refer_msg.get('content'))}"
doc.write(
f'''{{ type:49, text: '{contentText}',is_send:{is_send},sub_type:{content.get('type')},refer_text: '{referText}',avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},'''
)
else:
doc.write(
f'''{{ type:49, text: '{contentText}',is_send:{is_send},sub_type:{content.get('type')},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},'''
)
def system_msg(self, doc, message):
str_content = message[7]
is_send = message[4]
str_time = message[8]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
str_content = str_content.replace('<![CDATA[', "").replace(
' <a href="weixin://revoke_edit_click">重新编辑</a>]]>', "")
res = findall('(</{0,1}(img|revo|_wc_cus|a).*?>)', str_content)
for xmlstr, b in res:
str_content = str_content.replace(xmlstr, "")
str_content = escape_js_and_html(str_content)
doc.write(
f'''{{ type:0, text: '{str_content}',is_send:{is_send},avatar_path:'',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:''}},'''
)
def video(self, doc, message):
origin_path = os.path.join(os.getcwd(), OUTPUT_DIR, '聊天记录', self.contact.remark)
type_ = message[2]
str_content = message[7]
str_time = message[8]
is_send = message[4]
BytesExtra = message[10]
timestamp = message[5]
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
video_path = hard_link_db.get_video(str_content, BytesExtra, thumb=False)
image_path = hard_link_db.get_video(str_content, BytesExtra, thumb=True)
if video_path is None and image_path is not None:
image_path = path.get_relative_path(image_path, base_path=f'/data/聊天记录/{self.contact.remark}/image')
try:
# todo 网络图片问题
print(origin_path + image_path[1:])
os.utime(origin_path + image_path[1:], (timestamp, timestamp))
doc.write(
f'''{{ type:3, text: '{image_path}',is_send:{is_send},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},'''
)
except:
doc.write(
f'''{{ type:1, text: '视频丢失',is_send:{is_send},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},'''
)
return
if video_path is None and image_path is None:
return
video_path = f'{Me().wx_dir}/{video_path}'
video_path = video_path.replace('\\', '/')
if os.path.exists(video_path):
new_path = origin_path + '/video/' + os.path.basename(video_path)
if not os.path.exists(new_path):
shutil.copy(video_path, os.path.join(origin_path, 'video'))
os.utime(new_path, (timestamp, timestamp))
video_path = f'./video/{os.path.basename(video_path)}'
doc.write(
f'''{{ type:{type_}, text: '{video_path}',is_send:{is_send},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}'}},'''
)
def music_share(self, doc, message):
origin_path = os.path.join(os.getcwd(), OUTPUT_DIR, '聊天记录', self.contact.remark)
is_send = message[4]
timestamp = message[5]
content = music_share(message[11])
music_path = ''
if content.get('is_error') == False:
if content.get('audio_url') != '':
music_path = get_music_path(content.get('audio_url'), content.get('title'),
output_path=origin_path + '/music')
if music_path != '':
music_path = f'./music/{os.path.basename(music_path)}'
music_path = music_path.replace('\\', '/')
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
music_path = escape_js_and_html(music_path)
doc.write(
f'''{{ type:49, text:'{music_path}',is_send:{is_send},avatar_path:'{avatar}',link_url:'{content.get('link_url')}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}',sub_type:3,title:'{content.get('title')}',artist:'{content.get('artist')}', website_name:'{content.get('website_name')}'}},'''
)
def share_card(self, doc, message):
origin_path = os.path.join(os.getcwd(), OUTPUT_DIR, '聊天记录', self.contact.remark)
is_send = message[4]
timestamp = message[5]
bytesExtra = message[10]
compress_content_ = message[11]
card_data = share_card(bytesExtra, compress_content_)
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
thumbnail = ''
if card_data.get('thumbnail'):
thumbnail = os.path.join(Me().wx_dir, card_data.get('thumbnail'))
if os.path.exists(thumbnail):
shutil.copy(thumbnail, os.path.join(origin_path, 'image', os.path.basename(thumbnail)))
thumbnail = './image/' + os.path.basename(thumbnail)
else:
thumbnail = ''
app_logo = ''
if card_data.get('app_logo'):
app_logo = os.path.join(Me().wx_dir, card_data.get('app_logo'))
if os.path.exists(app_logo):
shutil.copy(app_logo, os.path.join(origin_path, 'image', os.path.basename(app_logo)))
app_logo = './image/' + os.path.basename(app_logo)
else:
app_logo = card_data.get('app_logo')
doc.write(
f'''{{ type:49,sub_type:5, text:'',is_send:{is_send},avatar_path:'{avatar}',url:'{card_data.get('url')}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}',title:'{card_data.get('title')}',description:'{card_data.get('description')}',thumbnail:'{thumbnail}',app_logo:'{app_logo}',app_name:'{card_data.get('app_name')}'}},\n'''
)
def transfer(self, doc, message):
is_send = message[4]
timestamp = message[5]
compress_content_ = message[11]
# open("test.bin", "wb").write(compress_content_)
transfer_detail = transfer_decompress(compress_content_)
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
try:
text_info_map = {
1: transfer_detail["pay_memo"] or "发起转账",
3: "已收款",
4: "已退还",
5: "非实时转账收款",
7: "发起非实时转账",
8: "未知",
9: "未知",
}
doc.write(
f"""{{ type:49,sub_type:2000,text:'{text_info_map[transfer_detail["paysubtype"]]}',is_send:{is_send},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}',paysubtype:{transfer_detail["paysubtype"]},pay_memo:'{transfer_detail["pay_memo"]}',feedesc:'{transfer_detail["feedesc"]}',}},\n""")
except Exception as e:
logger.error(f'转账解析错误:{transfer_detail}\n{traceback.format_exc()}')
def call(self, doc, message):
is_send = message[4]
timestamp = message[5]
str_content = message[7]
bytes_extra = message[10]
display_content = message[12]
call_detail = call_decompress(
is_send, bytes_extra, display_content, str_content
)
is_chatroom = 1 if self.contact.is_chatroom else 0
avatar = self.get_avatar_path(is_send, message)
display_name = self.get_display_name(is_send, message)
doc.write(
f"""{{ type:50, text:'{call_detail["display_content"]}',call_type:{call_detail["call_type"]},avatar_path:'{avatar}',timestamp:{timestamp},is_chatroom:{is_chatroom},displayname:'{display_name}',}},\n""")
def export(self):
print(f"【开始导出 HTML {self.contact.remark}")
messages = msg_db.get_messages(self.contact.wxid, time_range=self.time_range)
filename = os.path.join(os.getcwd(), OUTPUT_DIR, '聊天记录', self.contact.remark,
f'{self.contact.remark}.html')
file_path = './app/resources/data/template.html'
if not os.path.exists(file_path):
resource_dir = getattr(sys, '_MEIPASS', os.path.abspath(os.path.dirname(__file__)))
file_path = os.path.join(resource_dir, 'app', 'resources', 'data', 'template.html')
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
html_head, html_end = content.split('/*注意看这是分割线*/')
f = open(filename, 'w', encoding='utf-8')
html_head = html_head.replace("<title>出错了</title>", f"<title>{self.contact.remark}</title>")
html_head = html_head.replace("<p id=\"title\">出错了</p>", f"<p id=\"title\">{self.contact.remark}</p>")
f.write(html_head)
self.rangeSignal.emit(len(messages))
for index, message in enumerate(messages):
type_ = message[2]
sub_type = message[3]
timestamp = message[5]
if (type_ == 3 and self.message_types.get(3)) or (type_ == 34 and self.message_types.get(34)) or (
type_ == 47 and self.message_types.get(47)):
pass
else:
self.progressSignal.emit(1)
if type_ == 1 and self.message_types.get(type_):
self.text(f, message)
elif type_ == 3 and self.message_types.get(type_):
self.image(f, message)
elif type_ == 34 and self.message_types.get(type_):
self.audio(f, message)
elif type_ == 43 and self.message_types.get(type_):
self.video(f, message)
elif type_ == 47 and self.message_types.get(type_):
self.emoji(f, message)
elif type_ == 10000 and self.message_types.get(type_):
self.system_msg(f, message)
elif type_ == 49 and sub_type == 57 and self.message_types.get(1):
self.refermsg(f, message)
elif type_ == 49 and sub_type == 6 and self.message_types.get(4906):
self.file(f, message)
elif type_ == 49 and sub_type == 3 and self.message_types.get(4903):
self.music_share(f, message)
elif type_ == 49 and sub_type == 5 and self.message_types.get(4905):
self.share_card(f, message)
elif type_ == 49 and sub_type == 2000 and self.message_types.get(492000):
self.transfer(f, message)
elif type_ == 50 and self.message_types.get(50):
self.call(f, message)
if index % 2000 == 0:
print(f"【导出 HTML {self.contact.remark}{index}/{len(messages)}")
f.write(html_end)
f.close()
print(f"【完成导出 HTML {self.contact.remark}{len(messages)}")
self.count_finish_num(1)
def count_finish_num(self, num):
"""
记录子线程完成个数
@param num:
@return:
"""
self.num += 1
print("子线程完成", self.num, "/", self.total_num)
if self.num == self.total_num:
# 所有子线程都完成之后就发送完成信号
self.okSignal.emit(1)
class OutputMedia(QThread):
"""
导出语音消息
"""
okSingal = pyqtSignal(int)
progressSignal = pyqtSignal(int)
def __init__(self, contact):
super().__init__()
self.contact = contact
def run(self):
origin_path = os.path.join(os.getcwd(), OUTPUT_DIR, '聊天记录', self.contact.remark)
messages = msg_db.get_messages_by_type(self.contact.wxid, 34)
for message in messages:
is_send = message[4]
msgSvrId = message[9]
try:
audio_path = media_msg_db.get_audio(
msgSvrId, output_path=origin_path + "/voice"
)
except:
logger.error(traceback.format_exc())
finally:
self.progressSignal.emit(1)
self.okSingal.emit(34)
class OutputEmoji(QThread):
"""
导出表情包
"""
okSingal = pyqtSignal(int)
progressSignal = pyqtSignal(int)
def __init__(self, contact):
super().__init__()
self.contact = contact
def run(self):
origin_path = os.path.join(os.getcwd(), OUTPUT_DIR, '聊天记录', self.contact.remark)
messages = msg_db.get_messages_by_type(self.contact.wxid, 47)
for message in messages:
str_content = message[7]
try:
pass
# emoji_path = get_emoji(str_content, thumb=True, output_path=origin_path + '/emoji')
except:
logger.error(traceback.format_exc())
finally:
self.progressSignal.emit(1)
self.okSingal.emit(47)
class OutputImage(QThread):
"""
导出图片
"""
okSingal = pyqtSignal(int)
progressSignal = pyqtSignal(int)
def __init__(self, contact):
super().__init__()
self.contact = contact
self.child_thread_num = 2
self.child_threads = [0] * (self.child_thread_num + 1)
self.num = 0
def count1(self, num):
self.num += 1
print("图片导出完成一个")
if self.num == self.child_thread_num:
self.okSingal.emit(47)
print("图片导出完成")
def run(self):
origin_path = os.path.join(os.getcwd(), OUTPUT_DIR, '聊天记录', self.contact.remark)
messages = msg_db.get_messages_by_type(self.contact.wxid, 3)
base_path = os.path.join(OUTPUT_DIR, '聊天记录', self.contact.remark, 'image')
for message in messages:
str_content = message[7]
BytesExtra = message[10]
timestamp = message[5]
try:
image_path = hard_link_db.get_image(
str_content, BytesExtra, thumb=False
)
if not os.path.exists(os.path.join(Me().wx_dir, image_path)):
image_thumb_path = hard_link_db.get_image(
str_content, BytesExtra, thumb=True
)
if not os.path.exists(os.path.join(Me().wx_dir, image_thumb_path)):
continue
image_path = image_thumb_path
image_path = get_image(
image_path, base_path=base_path
)
try:
os.utime(origin_path + image_path[1:], (timestamp, timestamp))
except:
pass
except:
logger.error(traceback.format_exc())
finally:
self.progressSignal.emit(1)
self.okSingal.emit(47)
class OutputImageChild(QThread):
okSingal = pyqtSignal(int)
progressSignal = pyqtSignal(int)
def __init__(self, contact, messages):
super().__init__()
self.contact = contact
self.messages = messages
def run(self):
origin_path = os.path.join(os.getcwd(), OUTPUT_DIR, '聊天记录', self.contact.remark)
for message in self.messages:
str_content = message[7]
BytesExtra = message[10]
timestamp = message[5]
try:
image_path = hard_link_db.get_image(
str_content, BytesExtra, thumb=False
)
if not os.path.exists(os.path.join(Me().wx_dir, image_path)):
image_thumb_path = hard_link_db.get_image(
str_content, BytesExtra, thumb=True
)
if not os.path.exists(os.path.join(Me().wx_dir, image_thumb_path)):
continue
image_path = image_thumb_path
image_path = get_image(
image_path, base_path=f"/data/聊天记录/{self.contact.remark}/image"
)
try:
os.utime(origin_path + image_path[1:], (timestamp, timestamp))
except:
pass
except:
logger.error(traceback.format_exc())
finally:
self.progressSignal.emit(1)
self.okSingal.emit(47)
print("图片子线程完成")

View File

@@ -0,0 +1,193 @@
import json
import random
import os
from app.DataBase import msg_db
from app.person import Me
from .exporter import ExporterBase
def merge_content(conversions_list) -> list:
"""
合并一组对话中连续发送的句子
@param conversions_list:
@return:
"""
merged_data = []
current_role = None
current_content = ""
str_time = ''
for item in conversions_list:
if 'str_time' in item:
str_time = item['str_time']
else:
str_time = ''
if current_role is None:
current_role = item["role"]
current_content = item["content"]
elif current_role == item["role"]:
current_content += "\n" + item["content"]
else:
# merged_data.append({"role": current_role, "content": current_content, 'str_time': str_time})
merged_data.append({"role": current_role, "content": current_content})
current_role = item["role"]
current_content = item["content"]
str_time = item.get('str_time')
# 处理最后一组
if current_role is not None:
# merged_data.append({"role": current_role, "content": current_content,'str_time': str_time})
merged_data.append({"role": current_role, "content": current_content})
return merged_data
def system_prompt():
system = {
"role": "system",
# "content": f"你是{Me().name},一个聪明、热情、善良的男大学生,后面的对话来自{self.contact.remark}(!!!注意:对方的身份十分重要,你务必记住对方的身份,因为跟不同的人对话要用不同的态度、语气),你要认真地回答他"
"content": f"你是{Me().name},一个聪明、热情、善良的人,后面的对话来自你的朋友,你要认真地回答他"
}
return system
def message_to_conversion(group):
conversions = [system_prompt()]
while len(group) and group[-1][4] == 0:
group.pop()
for message in group:
is_send = message[4]
if len(conversions) == 1 and is_send:
continue
if is_send:
json_msg = {
"role": "assistant",
"content": message[7]
}
else:
json_msg = {
"role": "user",
"content": message[7]
}
json_msg['str_time'] = message[8]
conversions.append(json_msg)
if len(conversions) == 1:
return []
return merge_content(conversions)
class JsonExporter(ExporterBase):
def split_by_time(self, length=300):
messages = msg_db.get_messages_by_type(self.contact.wxid, type_=1, time_range=self.time_range)
start_time = 0
res = []
i = 0
while i < len(messages):
message = messages[i]
timestamp = message[5]
is_send = message[4]
group = [
system_prompt()
]
while i < len(messages) and timestamp - start_time < length:
if is_send:
json_msg = {
"role": "assistant",
"content": message[7]
}
else:
json_msg = {
"role": "user",
"content": message[7]
}
group.append(json_msg)
i += 1
if i >= len(messages):
break
message = messages[i]
timestamp = message[5]
is_send = message[4]
while is_send:
json_msg = {
"role": "assistant",
"content": message[7]
}
group.append(json_msg)
i += 1
if i >= len(messages):
break
message = messages[i]
timestamp = message[5]
is_send = message[4]
start_time = timestamp
res.append(
{
"conversations": group
}
)
res_ = []
for item in res:
conversations = item['conversations']
res_.append({
'conversations': merge_content(conversations)
})
return res_
def split_by_intervals(self, max_diff_seconds=300):
messages = msg_db.get_messages_by_type(self.contact.wxid, type_=1, time_range=self.time_range)
res = []
i = 0
current_group = []
while i < len(messages):
message = messages[i]
timestamp = message[5]
is_send = message[4]
while is_send and i + 1 < len(messages):
i += 1
message = messages[i]
is_send = message[4]
current_group = [messages[i]]
i += 1
while i < len(messages) and messages[i][5] - current_group[-1][5] <= max_diff_seconds:
current_group.append(messages[i])
i += 1
while i < len(messages) and messages[i][4]:
current_group.append(messages[i])
i += 1
res.append(current_group)
res_ = []
for group in res:
conversations = message_to_conversion(group)
if conversations:
res_.append({
'conversations': conversations
})
return res_
def to_json(self):
print(f"【开始导出 json {self.contact.remark}")
origin_path = self.origin_path
os.makedirs(origin_path, exist_ok=True)
filename = os.path.join(origin_path, f"{self.contact.remark}")
# res = self.split_by_time()
res = self.split_by_intervals(60)
# 打乱列表顺序
random.shuffle(res)
# 计算切分比例
split_ratio = 0.2 # 20% for the second list
# 计算切分点
split_point = int(len(res) * split_ratio)
# 分割列表
train_data = res[split_point:]
dev_data = res[:split_point]
with open(f'{filename}_train.json', "w", encoding="utf-8") as f:
json.dump(train_data, f, ensure_ascii=False, indent=4)
with open(f'{filename}_dev.json', "w", encoding="utf-8") as f:
json.dump(dev_data, f, ensure_ascii=False, indent=4)
self.okSignal.emit(1)
def run(self):
self.to_json()

View File

@@ -0,0 +1,146 @@
import os
from app.DataBase import msg_db
from app.util.exporter.exporter import ExporterBase
from app.config import OUTPUT_DIR
from app.util.compress_content import parser_reply, share_card
class TxtExporter(ExporterBase):
def text(self, doc, message):
str_content = message[7]
str_time = message[8]
is_send = message[4]
display_name = self.get_display_name(is_send, message)
name = display_name
doc.write(
f'''{str_time} {name}\n{str_content}\n\n'''
)
def image(self, doc, message):
str_time = message[8]
is_send = message[4]
display_name = self.get_display_name(is_send, message)
doc.write(
f'''{str_time} {display_name}\n[图片]\n\n'''
)
def audio(self, doc, message):
str_time = message[8]
is_send = message[4]
display_name = self.get_display_name(is_send, message)
doc.write(
f'''{str_time} {display_name}\n[语音]\n\n'''
)
def emoji(self, doc, message):
str_time = message[8]
is_send = message[4]
display_name = self.get_display_name(is_send, message)
doc.write(
f'''{str_time} {display_name}\n[表情包]\n\n'''
)
def file(self, doc, message):
str_time = message[8]
is_send = message[4]
display_name = self.get_display_name(is_send, message)
doc.write(
f'''{str_time} {display_name}\n[文件]\n\n'''
)
def refermsg(self, doc, message):
"""
处理回复消息
@param doc:
@param message:
@return:
"""
str_time = message[8]
is_send = message[4]
content = parser_reply(message[11])
refer_msg = content.get('refer')
display_name = self.get_display_name(is_send, message)
if refer_msg:
doc.write(
f'''{str_time} {display_name}\n{content.get('title')}\n引用:{refer_msg.get('displayname')}:{refer_msg.get('content')}\n\n'''
)
else:
doc.write(
f'''{str_time} {display_name}\n{content.get('title')}\n引用:未知\n\n'''
)
def system_msg(self, doc, message):
str_content = message[7]
str_time = message[8]
str_content = str_content.replace('<![CDATA[', "").replace(
' <a href="weixin://revoke_edit_click">重新编辑</a>]]>', "")
doc.write(
f'''{str_time} {str_content}\n\n'''
)
def video(self, doc, message):
str_time = message[8]
is_send = message[4]
display_name = self.get_display_name(is_send, message)
doc.write(
f'''{str_time} {display_name}\n[视频]\n\n'''
)
def music_share(self, doc, message):
is_send = message[4]
str_time = message[8]
display_name = self.get_display_name(is_send, message)
doc.write(
f'''{str_time} {display_name}\n[音乐分享]\n\n'''
)
def share_card(self, doc, message):
is_send = message[4]
bytesExtra = message[10]
compress_content_ = message[11]
str_time = message[8]
card_data = share_card(bytesExtra, compress_content_)
display_name = self.get_display_name(is_send, message)
doc.write(
f'''{str_time} {display_name}
[链接]:title:{card_data.get('title')}
description:{card_data.get('description')}
url:{card_data.get('url')}
name:{card_data.get('app_name')}
\n\n'''
)
def export(self):
# 实现导出为txt的逻辑
print(f"【开始导出 TXT {self.contact.remark}")
origin_path = os.path.join(os.getcwd(), OUTPUT_DIR, '聊天记录', self.contact.remark)
os.makedirs(origin_path, exist_ok=True)
filename = os.path.join(origin_path, self.contact.remark+'.txt')
messages = msg_db.get_messages(self.contact.wxid, time_range=self.time_range)
total_steps = len(messages)
with open(filename, mode='w', newline='', encoding='utf-8') as f:
for index, message in enumerate(messages):
type_ = message[2]
sub_type = message[3]
self.progressSignal.emit(int((index + 1) / total_steps * 100))
if type_ == 1 and self.message_types.get(type_):
self.text(f, message)
elif type_ == 3 and self.message_types.get(type_):
self.image(f, message)
elif type_ == 34 and self.message_types.get(type_):
self.audio(f, message)
elif type_ == 43 and self.message_types.get(type_):
self.video(f, message)
elif type_ == 47 and self.message_types.get(type_):
self.emoji(f, message)
elif type_ == 10000 and self.message_types.get(type_):
self.system_msg(f, message)
elif type_ == 49 and sub_type == 57 and self.message_types.get(1):
self.refermsg(f, message)
elif type_ == 49 and sub_type == 6 and self.message_types.get(4906):
self.file(f, message)
elif type_ == 49 and sub_type == 3 and self.message_types.get(4903):
self.music_share(f, message)
elif type_ == 49 and sub_type == 5 and self.message_types.get(4905):
self.share_card(f, message)
print(f"【完成导出 TXT {self.contact.remark}")
self.okSignal.emit(1)

466
app/util/exporter/output.py Normal file
View File

@@ -0,0 +1,466 @@
import csv
import os
import time
import traceback
from typing import List
import docx
from PyQt5.QtCore import pyqtSignal, QThread
from PyQt5.QtWidgets import QFileDialog
from docx.oxml.ns import qn
from docxcompose.composer import Composer
from app.util.exporter.exporter_ai_txt import AiTxtExporter
from app.util.exporter.exporter_csv import CSVExporter
from app.util.exporter.exporter_docx import DocxExporter
from app.util.exporter.exporter_html import HtmlExporter
from app.util.exporter.exporter_json import JsonExporter
from app.util.exporter.exporter_txt import TxtExporter
from app.DataBase.hard_link import decodeExtraBuf
from app.config import OUTPUT_DIR
from app.DataBase.package_msg import PackageMsg
from app.DataBase import media_msg_db, hard_link_db, micro_msg_db, msg_db
from app.log import logger
from app.person import Me
from app.util.image import get_image
os.makedirs(os.path.join(OUTPUT_DIR, '聊天记录'), exist_ok=True)
class Output(QThread):
"""
发送信息线程
"""
startSignal = pyqtSignal(int)
progressSignal = pyqtSignal(int)
rangeSignal = pyqtSignal(int)
okSignal = pyqtSignal(int)
batchOkSignal = pyqtSignal(int)
nowContact = pyqtSignal(str)
i = 1
CSV = 0
DOCX = 1
HTML = 2
CSV_ALL = 3
CONTACT_CSV = 4
TXT = 5
JSON = 6
AI_TXT = 7
Batch = 10086
def __init__(self, contact, type_=DOCX, message_types={}, sub_type=[], time_range=None, parent=None):
super().__init__(parent)
self.children = []
self.last_timestamp = 0
self.sub_type = sub_type
self.time_range = time_range
self.message_types = message_types
self.sec = 2 # 默认1000秒
self.contact = contact
self.msg_id = 0
self.output_type: int | List[int] = type_
self.total_num = 1
self.num = 0
def progress(self, value):
self.progressSignal.emit(value)
def output_image(self):
"""
导出全部图片
@return:
"""
return
def output_emoji(self):
"""
导出全部表情包
@return:
"""
return
def to_csv_all(self):
"""
导出全部聊天记录到CSV
@return:
"""
origin_path = os.path.join(os.getcwd(), OUTPUT_DIR, '聊天记录')
os.makedirs(origin_path, exist_ok=True)
filename = QFileDialog.getSaveFileName(None, "save file", os.path.join(os.getcwd(), 'messages.csv'),
"csv files (*.csv);;all files(*.*)")
if not filename[0]:
return
self.startSignal.emit(1)
filename = filename[0]
# columns = ["用户名", "消息内容", "发送时间", "发送状态", "消息类型", "isSend", "msgId"]
columns = ['localId', 'TalkerId', 'Type', 'SubType',
'IsSender', 'CreateTime', 'Status', 'StrContent',
'StrTime', 'Remark', 'NickName', 'Sender']
packagemsg = PackageMsg()
messages = packagemsg.get_package_message_all()
# 写入CSV文件
with open(filename, mode='w', newline='', encoding='utf-8-sig') as file:
writer = csv.writer(file)
writer.writerow(columns)
# 写入数据
writer.writerows(messages)
self.okSignal.emit(1)
def contact_to_csv(self):
"""
导出联系人到CSV
@return:
"""
filename = QFileDialog.getSaveFileName(None, "save file", os.path.join(os.getcwd(), 'contacts.csv'),
"csv files (*.csv);;all files(*.*)")
if not filename[0]:
return
self.startSignal.emit(1)
filename = filename[0]
# columns = ["用户名", "消息内容", "发送时间", "发送状态", "消息类型", "isSend", "msgId"]
columns = ['UserName', 'Alias', 'Type', 'Remark', 'NickName', 'PYInitial', 'RemarkPYInitial', 'smallHeadImgUrl',
'bigHeadImgUrl', 'label', 'gender', 'telephone', 'signature', 'country/region', 'province', 'city']
contacts = micro_msg_db.get_contact()
# 写入CSV文件
with open(filename, mode='w', newline='', encoding='utf-8-sig') as file:
writer = csv.writer(file)
writer.writerow(columns)
# 写入数据
# writer.writerows(contacts)
for contact in contacts:
detail = decodeExtraBuf(contact[9])
gender_code = detail.get('gender')
if gender_code == 0:
gender = '未知'
elif gender_code == 1:
gender = ''
else:
gender = ''
writer.writerow([*contact[:9], contact[10], gender, detail.get('telephone'), detail.get('signature'),
*detail.get('region')])
self.okSignal.emit(1)
def batch_export(self):
print('开始批量导出')
print(self.sub_type, self.message_types)
print(len(self.contact))
print([contact.remark for contact in self.contact])
self.batch_num_total = len(self.contact) * len(self.sub_type)
self.batch_num = 0
self.rangeSignal.emit(self.batch_num_total)
for contact in self.contact:
# print('联系人', contact.remark)
for type_ in self.sub_type:
# print('导出类型', type_)
if type_ == self.DOCX:
self.to_docx(contact, self.message_types, True)
elif type_ == self.TXT:
# print('批量导出txt')
self.to_txt(contact, self.message_types, True)
elif type_ == self.AI_TXT:
# print('批量导出txt')
self.to_ai_txt(contact, self.message_types, True)
elif type_ == self.CSV:
self.to_csv(contact, self.message_types, True)
elif type_ == self.HTML:
self.to_html(contact, self.message_types, True)
elif type_ == self.JSON:
self.to_json(contact,self.message_types,True)
def batch_finish_one(self, num):
self.nowContact.emit(self.contact[self.batch_num // len(self.sub_type)].remark)
self.batch_num += 1
if self.batch_num == self.batch_num_total:
self.okSignal.emit(1)
def merge_docx(self, n):
conRemark = self.contact.remark
origin_path = os.path.join(os.getcwd(), OUTPUT_DIR, '聊天记录', conRemark)
filename = f"{origin_path}/{conRemark}_{n}.docx"
if n == 10086:
# self.document.append(self.document)
file = os.path.join(origin_path, f'{conRemark}.docx')
try:
self.document.save(file)
except PermissionError:
file = file[:-5] + f'{time.time()}' + '.docx'
self.document.save(file)
self.okSignal.emit(1)
return
doc = docx.Document(filename)
self.document.append(doc)
os.remove(filename)
if n % 50 == 0:
# self.document.append(self.document)
file = os.path.join(origin_path, f'{conRemark}-{n // 50}.docx')
try:
self.document.save(file)
except PermissionError:
file = file[:-5] + f'{time.time()}' + '.docx'
self.document.save(file)
doc = docx.Document()
doc.styles["Normal"].font.name = "Cambria"
doc.styles["Normal"]._element.rPr.rFonts.set(qn("w:eastAsia"), "宋体")
self.document = Composer(doc)
def to_docx(self, contact, message_types, is_batch=False):
doc = docx.Document()
doc.styles["Normal"].font.name = "Cambria"
doc.styles["Normal"]._element.rPr.rFonts.set(qn("w:eastAsia"), "宋体")
self.document = Composer(doc)
Child = DocxExporter(contact, type_=self.DOCX, message_types=message_types, time_range=self.time_range)
self.children.append(Child)
Child.progressSignal.connect(self.progress)
if not is_batch:
Child.rangeSignal.connect(self.rangeSignal)
Child.okSignal.connect(self.merge_docx if not is_batch else self.batch_finish_one)
Child.start()
def to_json(self, contact, message_types, is_batch=False):
Child = JsonExporter(contact, type_=self.JSON, message_types=message_types, time_range=self.time_range)
self.children.append(Child)
Child.progressSignal.connect(self.progress)
if not is_batch:
Child.rangeSignal.connect(self.rangeSignal)
Child.okSignal.connect(self.okSignal if not is_batch else self.batch_finish_one)
Child.start()
def to_txt(self, contact, message_types, is_batch=False):
Child = TxtExporter(contact, type_=self.TXT, message_types=message_types, time_range=self.time_range)
self.children.append(Child)
Child.progressSignal.connect(self.progress)
if not is_batch:
Child.rangeSignal.connect(self.rangeSignal)
Child.okSignal.connect(self.okSignal if not is_batch else self.batch_finish_one)
Child.start()
def to_ai_txt(self, contact, message_types, is_batch=False):
Child = AiTxtExporter(contact, type_=self.TXT, message_types=message_types, time_range=self.time_range)
self.children.append(Child)
Child.progressSignal.connect(self.progress)
if not is_batch:
Child.rangeSignal.connect(self.rangeSignal)
Child.okSignal.connect(self.okSignal if not is_batch else self.batch_finish_one)
Child.start()
def to_html(self, contact, message_types, is_batch=False):
Child = HtmlExporter(contact, type_=self.output_type, message_types=message_types, time_range=self.time_range)
self.children.append(Child)
Child.progressSignal.connect(self.progress)
if not is_batch:
Child.rangeSignal.connect(self.rangeSignal)
Child.okSignal.connect(self.count_finish_num)
Child.start()
self.total_num = 1
if message_types.get(34):
# 语音消息单独的线程
self.total_num += 1
output_media = OutputMedia(contact, time_range=self.time_range)
self.children.append(output_media)
output_media.okSingal.connect(self.count_finish_num)
output_media.progressSignal.connect(self.progressSignal)
output_media.start()
if message_types.get(47):
# emoji消息单独的线程
self.total_num += 1
output_emoji = OutputEmoji(contact, time_range=self.time_range)
self.children.append(output_emoji)
output_emoji.okSingal.connect(self.count_finish_num)
output_emoji.progressSignal.connect(self.progressSignal)
output_emoji.start()
if message_types.get(3):
# 图片消息单独的线程
self.total_num += 1
output_image = OutputImage(contact, time_range=self.time_range)
self.children.append(output_image)
output_image.okSingal.connect(self.count_finish_num)
output_image.progressSignal.connect(self.progressSignal)
output_image.start()
def to_csv(self, contact, message_types, is_batch=False):
Child = CSVExporter(contact, type_=self.CSV, message_types=message_types, time_range=self.time_range)
self.children.append(Child)
Child.progressSignal.connect(self.progress)
if not is_batch:
Child.rangeSignal.connect(self.rangeSignal)
Child.okSignal.connect(self.okSignal if not is_batch else self.batch_finish_one)
Child.start()
def run(self):
if self.output_type == self.DOCX:
self.to_docx(self.contact, self.message_types)
elif self.output_type == self.CSV_ALL:
self.to_csv_all()
elif self.output_type == self.CONTACT_CSV:
self.contact_to_csv()
elif self.output_type == self.TXT:
self.to_txt(self.contact, self.message_types)
elif self.output_type == self.AI_TXT:
self.to_ai_txt(self.contact, self.message_types)
elif self.output_type == self.CSV:
self.to_csv(self.contact, self.message_types)
elif self.output_type == self.HTML:
self.to_html(self.contact, self.message_types)
elif self.output_type == self.JSON:
self.to_json(self.contact, self.message_types)
elif self.output_type == self.Batch:
self.batch_export()
def count_finish_num(self, num):
"""
记录子线程完成个数
@param num:
@return:
"""
self.num += 1
if self.num == self.total_num:
# 所有子线程都完成之后就发送完成信号
if self.output_type == self.Batch:
self.batch_finish_one(1)
else:
self.okSignal.emit(1)
self.num = 0
def cancel(self):
self.requestInterruption()
class OutputMedia(QThread):
"""
导出语音消息
"""
okSingal = pyqtSignal(int)
progressSignal = pyqtSignal(int)
def __init__(self, contact, time_range=None):
super().__init__()
self.contact = contact
self.time_range = time_range
def run(self):
origin_path = os.path.join(os.getcwd(), OUTPUT_DIR, '聊天记录', self.contact.remark)
messages = msg_db.get_messages_by_type(self.contact.wxid, 34, time_range=self.time_range)
for message in messages:
is_send = message[4]
msgSvrId = message[9]
try:
audio_path = media_msg_db.get_audio(msgSvrId, output_path=origin_path + "/voice")
except:
logger.error(traceback.format_exc())
finally:
self.progressSignal.emit(1)
self.okSingal.emit(34)
class OutputEmoji(QThread):
"""
导出表情包
"""
okSingal = pyqtSignal(int)
progressSignal = pyqtSignal(int)
def __init__(self, contact, time_range=None):
super().__init__()
self.contact = contact
self.time_range = time_range
def run(self):
origin_path = os.path.join(os.getcwd(), OUTPUT_DIR, '聊天记录', self.contact.remark)
messages = msg_db.get_messages_by_type(self.contact.wxid, 47, time_range=self.time_range)
for message in messages:
str_content = message[7]
try:
pass
# emoji_path = get_emoji(str_content, thumb=True, output_path=origin_path + '/emoji')
except:
logger.error(traceback.format_exc())
finally:
self.progressSignal.emit(1)
self.okSingal.emit(47)
class OutputImage(QThread):
"""
导出图片
"""
okSingal = pyqtSignal(int)
progressSignal = pyqtSignal(int)
def __init__(self, contact, time_range):
super().__init__()
self.contact = contact
self.child_thread_num = 2
self.time_range = time_range
self.child_threads = [0] * (self.child_thread_num + 1)
self.num = 0
def count1(self, num):
self.num += 1
print('图片导出完成一个')
if self.num == self.child_thread_num:
self.okSingal.emit(47)
print('图片导出完成')
def run(self):
origin_path = os.path.join(os.getcwd(), OUTPUT_DIR, '聊天记录', self.contact.remark)
messages = msg_db.get_messages_by_type(self.contact.wxid, 3, time_range=self.time_range)
base_path = os.path.join(OUTPUT_DIR, '聊天记录', self.contact.remark, 'image')
for message in messages:
str_content = message[7]
BytesExtra = message[10]
timestamp = message[5]
try:
image_path = hard_link_db.get_image(str_content, BytesExtra, up_dir=Me().wx_dir, thumb=False)
image_path = get_image(image_path, base_path=base_path)
try:
os.utime(origin_path + image_path[1:], (timestamp, timestamp))
except:
pass
except:
logger.error(traceback.format_exc())
finally:
self.progressSignal.emit(1)
self.okSingal.emit(47)
class OutputImageChild(QThread):
okSingal = pyqtSignal(int)
progressSignal = pyqtSignal(int)
def __init__(self, contact, messages, time_range):
super().__init__()
self.contact = contact
self.messages = messages
self.time_range = time_range
def run(self):
origin_path = os.path.join(os.getcwd(), OUTPUT_DIR, '聊天记录', self.contact.remark)
for message in self.messages:
str_content = message[7]
BytesExtra = message[10]
timestamp = message[5]
try:
image_path = hard_link_db.get_image(str_content, BytesExtra, thumb=False)
if not os.path.exists(os.path.join(Me().wx_dir, image_path)):
image_thumb_path = hard_link_db.get_image(str_content, BytesExtra, thumb=True)
if not os.path.exists(os.path.join(Me().wx_dir, image_thumb_path)):
continue
image_path = image_thumb_path
image_path = get_image(image_path, base_path=f'/data/聊天记录/{self.contact.remark}/image')
try:
os.utime(origin_path + image_path[1:], (timestamp, timestamp))
except:
pass
except:
logger.error(traceback.format_exc())
finally:
self.progressSignal.emit(1)
self.okSingal.emit(47)
print('图片子线程完成')
if __name__ == "__main__":
pass

59
app/util/file.py Normal file
View File

@@ -0,0 +1,59 @@
import os
import traceback
import shutil
import requests
from app.log import log, logger
from app.util.protocbuf.msg_pb2 import MessageBytesExtra
from ..person import Me
root_path = './data/files/'
if not os.path.exists('./data'):
os.mkdir('./data')
if not os.path.exists(root_path):
os.mkdir(root_path)
class File:
def __init__(self):
self.open_flag = False
def get_file(bytes_extra, file_name, output_path=root_path) -> str:
try:
msg_bytes = MessageBytesExtra()
msg_bytes.ParseFromString(bytes_extra)
file_path = ''
real_path = ''
if len(msg_bytes.message2) > 0:
for filed in msg_bytes.message2:
if filed.field1 == 4:
file_original_path = filed.field2
file_path = os.path.join(output_path, file_name)
if os.path.exists(file_path):
# print('文件' + file_path + '已存在')
return file_path
if os.path.isabs(file_original_path): # 绝对路径可能迁移过文件目录,也可能存在其他位置
if os.path.exists(file_original_path):
real_path = file_original_path
else: # 如果没找到再判断一次是否是迁移了目录
if file_original_path.find(r"FileStorage") != -1:
real_path = Me().wx_dir + file_original_path[
file_original_path.find("FileStorage") - 1:]
else:
if file_original_path.find(Me().wxid) != -1:
real_path = Me().wx_dir + file_original_path.replace(Me().wxid, '')
else:
real_path = Me().wx_dir + file_original_path
if real_path != "":
if os.path.exists(real_path):
print('开始获取文件' + real_path)
shutil.copy2(real_path, file_path)
else:
print('文件' + file_original_path + '已丢失')
file_path = ''
return file_path
except:
logger.error(traceback.format_exc())
return ""

135
app/util/image.py Normal file
View File

@@ -0,0 +1,135 @@
import os
import traceback
from app.log import logger
from app.person import Me
# 图片字节头信息,
# [0][1]为jpg头信息
# [2][3]为png头信息
# [4][5]为gif头信息
pic_head = [0xff, 0xd8, 0x89, 0x50, 0x47, 0x49]
# 解密码
decode_code = 0
def get_code(dat_read) -> tuple[int, int]:
"""
自动判断文件类型并获取dat文件解密码
:param file_path: dat文件路径
:return: 如果文件为jpg/png/gif格式则返回解密码否则返回-1
"""
try:
if not dat_read:
return -1, -1
head_index = 0
while head_index < len(pic_head):
# 使用第一个头信息字节来计算加密码
# 第二个字节来验证解密码是否正确
code = dat_read[0] ^ pic_head[head_index]
idf_code = dat_read[1] ^ code
head_index = head_index + 1
if idf_code == pic_head[head_index]:
return head_index, code
head_index = head_index + 1
print("not jpg, png, gif")
return -1, -1
except:
logger.error(f'image解析发生了错误:\n\n{traceback.format_exc()}')
return -1, -1
def decode_dat(file_path, out_path) -> str:
"""
解密文件,并生成图片
:param file_path: dat文件路径
:return: 无
"""
if not os.path.exists(file_path):
return None
with open(file_path, 'rb') as file_in:
data = file_in.read()
file_type, decode_code = get_code(data[:2])
if decode_code == -1:
return ''
filename = os.path.basename(file_path)
if file_type == 1:
pic_name = os.path.basename(file_path)[:-4] + ".jpg"
elif file_type == 3:
pic_name = filename[:-4] + ".png"
elif file_type == 5:
pic_name = filename[:-4] + ".gif"
else:
pic_name = filename[:-4] + ".jpg"
file_outpath = os.path.join(out_path, pic_name)
if os.path.exists(file_outpath):
return file_outpath
# 对数据进行异或加密/解密
with open(file_outpath, 'wb') as file_out:
file_out.write(bytes([byte ^ decode_code for byte in data]))
print(file_path, '->', file_outpath)
return file_outpath
def decode_dat_path(file_path, out_path) -> str:
"""
解密文件,并生成图片
:param file_path: dat文件路径
:return: 无
"""
if not os.path.exists(file_path):
return ''
with open(file_path, 'rb') as file_in:
data = file_in.read(2)
file_type, decode_code = get_code(data)
if decode_code == -1:
return ''
filename = os.path.basename(file_path)
if file_type == 1:
pic_name = os.path.basename(file_path)[:-4] + ".jpg"
elif file_type == 3:
pic_name = filename[:-4] + ".png"
elif file_type == 5:
pic_name = filename[:-4] + ".gif"
else:
pic_name = filename[:-4] + ".jpg"
file_outpath = os.path.join(out_path, pic_name)
return file_outpath
def get_image(path, base_path) -> str:
if path:
base_path = os.path.join(os.getcwd(),base_path)
output_path = decode_dat(os.path.join(Me().wx_dir, path), base_path)
relative_path = './image/' + os.path.basename(
output_path) if output_path else 'https://www.bing.com/images/search?view=detailV2&ccid=Zww6woP3&id=CCC91337C740656E800E51247E928ACD3052FECF&thid=OIP.Zww6woP3Em49TdSG_lnggAHaEK&mediaurl=https%3a%2f%2fmeekcitizen.files.wordpress.com%2f2018%2f09%2f404.jpg%3fw%3d656&exph=360&expw=640&q=404&simid=608040792714530493&FORM=IRPRST&ck=151E7337A86F1B9C5C5DB08B15B90809&selectedIndex=21&itb=0'
return relative_path
else:
return ':/icons/icons/404.png'
def get_image_abs_path(path, base_path) -> str:
if path:
base_path = os.path.join(os.getcwd(),base_path)
output_path = decode_dat(os.path.join(Me().wx_dir, path), base_path)
return output_path
else:
return ':/icons/icons/404.png'
def get_image_path(path, base_path) -> str:
if path:
base_path = os.getcwd() + base_path
output_path = decode_dat_path(os.path.join(Me().wx_dir, path), base_path)
relative_path = './image/' + os.path.basename(
output_path) if output_path else 'https://www.bing.com/images/search?view=detailV2&ccid=Zww6woP3&id=CCC91337C740656E800E51247E928ACD3052FECF&thid=OIP.Zww6woP3Em49TdSG_lnggAHaEK&mediaurl=https%3a%2f%2fmeekcitizen.files.wordpress.com%2f2018%2f09%2f404.jpg%3fw%3d656&exph=360&expw=640&q=404&simid=608040792714530493&FORM=IRPRST&ck=151E7337A86F1B9C5C5DB08B15B90809&selectedIndex=21&itb=0'
return relative_path
else:
return ':/icons/icons/404.png'
if __name__ == "__main__":
pass

55
app/util/music.py Normal file
View File

@@ -0,0 +1,55 @@
import os
import traceback
import shutil
from app.log import log, logger
from app.util.protocbuf.msg_pb2 import MessageBytesExtra
import requests
from urllib.parse import urlparse, parse_qs
import re
root_path = './data/music/'
if not os.path.exists('./data'):
os.mkdir('./data')
if not os.path.exists(root_path):
os.mkdir(root_path)
class File:
def __init__(self):
self.open_flag = False
def get_music_path(url, file_title, output_path=root_path) -> str:
try:
parsed_url = urlparse(url)
if '.' in parsed_url.path:
# 获取扩展名
file_extension = parsed_url.path.split('.')[-1]
pattern = r'[\\/:*?"<>|\r\n]+'
file_title = re.sub(pattern, "_", file_title)
file_name = file_title + '.' + file_extension
music_path = os.path.join(output_path, file_name)
if os.path.exists(music_path):
# print('文件' + music_path + '已存在')
return music_path
header = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.40 Safari/537.36 Edg/87.0.664.24'
}
requests.packages.urllib3.disable_warnings()
response = requests.get(url,headers=header,verify=False)
if response.status_code == 200:
with open(music_path, 'wb') as f:
f.write(response.content)
else:
music_path = ''
print("音乐" + file_name + "获取失败:请求地址:" + url)
else:
music_path = ''
print('音乐文件已失效url' + url)
return music_path
except Exception as e:
print(f"Get Music Path Error: {e}")
logger.error(traceback.format_exc())
return ""

81
app/util/path.py Normal file
View File

@@ -0,0 +1,81 @@
import os
import winreg
from app.person import Me
from app.util import image
os.makedirs('./data/image', exist_ok=True)
def get_abs_path(path, base_path="/data/image"):
# return os.path.join(os.getcwd(), 'app/data/icons/404.png')
if path:
base_path = os.getcwd() + base_path
output_path = image.decode_dat(os.path.join(Me().wx_dir, path), base_path)
return output_path if output_path else ':/icons/icons/404.png'
else:
return ':/icons/icons/404.png'
def get_relative_path(path, base_path, type_='image'):
if path:
base_path = os.getcwd() + base_path
output_path = image.decode_dat(os.path.join(Me().wx_dir, path), base_path)
relative_path = './image/' + os.path.basename(
output_path) if output_path else 'https://www.bing.com/images/search?view=detailV2&ccid=Zww6woP3&id=CCC91337C740656E800E51247E928ACD3052FECF&thid=OIP.Zww6woP3Em49TdSG_lnggAHaEK&mediaurl=https%3a%2f%2fmeekcitizen.files.wordpress.com%2f2018%2f09%2f404.jpg%3fw%3d656&exph=360&expw=640&q=404&simid=608040792714530493&FORM=IRPRST&ck=151E7337A86F1B9C5C5DB08B15B90809&selectedIndex=21&itb=0'
return relative_path
else:
return ':/icons/icons/404.png'
def mkdir(path):
if not os.path.exists(path):
os.mkdir(path)
def wx_path():
try:
is_w_dir = False
try:
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Software\Tencent\WeChat", 0, winreg.KEY_READ)
value, _ = winreg.QueryValueEx(key, "FileSavePath")
winreg.CloseKey(key)
w_dir = value
is_w_dir = True
except Exception as e:
w_dir = "MyDocument:"
if not is_w_dir:
try:
user_profile = os.environ.get("USERPROFILE")
path_3ebffe94 = os.path.join(user_profile, "AppData", "Roaming", "Tencent", "WeChat", "All Users",
"config",
"3ebffe94.ini")
with open(path_3ebffe94, "r", encoding="utf-8") as f:
w_dir = f.read()
is_w_dir = True
except Exception as e:
w_dir = "MyDocument:"
if w_dir == "MyDocument:":
try:
# 打开注册表路径
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Explorer\User Shell Folders")
documents_path = winreg.QueryValueEx(key, "Personal")[0] # 读取文档实际目录路径
winreg.CloseKey(key) # 关闭注册表
documents_paths = os.path.split(documents_path)
if "%" in documents_paths[0]:
w_dir = os.environ.get(documents_paths[0].replace("%", ""))
w_dir = os.path.join(w_dir, os.path.join(*documents_paths[1:]))
# print(1, w_dir)
else:
w_dir = documents_path
except Exception as e:
profile = os.environ.get("USERPROFILE")
w_dir = os.path.join(profile, "Documents")
msg_dir = os.path.join(w_dir, "WeChat Files")
return msg_dir
except FileNotFoundError:
return '.'

View File

View File

@@ -0,0 +1,18 @@
syntax = "proto3";
package app.protobuf;
option go_package=".;proto";
message SubMessage1 {
int32 field1 = 1;
int32 field2 = 2;
}
message SubMessage2 {
int32 field1 = 1;
string field2 = 2;
}
message MessageBytesExtra {
SubMessage1 message1 = 1;
repeated SubMessage2 message2 = 3;
}

View File

@@ -0,0 +1,54 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: msg.proto
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\tmsg.proto\x12\x0c\x61pp.protobuf\"-\n\x0bSubMessage1\x12\x0e\n\x06\x66ield1\x18\x01 \x01(\x05\x12\x0e\n\x06\x66ield2\x18\x02 \x01(\x05\"-\n\x0bSubMessage2\x12\x0e\n\x06\x66ield1\x18\x01 \x01(\x05\x12\x0e\n\x06\x66ield2\x18\x02 \x01(\t\"m\n\x11MessageBytesExtra\x12+\n\x08message1\x18\x01 \x01(\x0b\x32\x19.app.protobuf.SubMessage1\x12+\n\x08message2\x18\x03 \x03(\x0b\x32\x19.app.protobuf.SubMessage2b\x06proto3')
_SUBMESSAGE1 = DESCRIPTOR.message_types_by_name['SubMessage1']
_SUBMESSAGE2 = DESCRIPTOR.message_types_by_name['SubMessage2']
_MESSAGEBYTESEXTRA = DESCRIPTOR.message_types_by_name['MessageBytesExtra']
SubMessage1 = _reflection.GeneratedProtocolMessageType('SubMessage1', (_message.Message,), {
'DESCRIPTOR' : _SUBMESSAGE1,
'__module__' : 'msg_pb2'
# @@protoc_insertion_point(class_scope:app.protobuf.SubMessage1)
})
_sym_db.RegisterMessage(SubMessage1)
SubMessage2 = _reflection.GeneratedProtocolMessageType('SubMessage2', (_message.Message,), {
'DESCRIPTOR' : _SUBMESSAGE2,
'__module__' : 'msg_pb2'
# @@protoc_insertion_point(class_scope:app.protobuf.SubMessage2)
})
_sym_db.RegisterMessage(SubMessage2)
MessageBytesExtra = _reflection.GeneratedProtocolMessageType('MessageBytesExtra', (_message.Message,), {
'DESCRIPTOR' : _MESSAGEBYTESEXTRA,
'__module__' : 'msg_pb2'
# @@protoc_insertion_point(class_scope:app.protobuf.MessageBytesExtra)
})
_sym_db.RegisterMessage(MessageBytesExtra)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_SUBMESSAGE1._serialized_start=27
_SUBMESSAGE1._serialized_end=72
_SUBMESSAGE2._serialized_start=74
_SUBMESSAGE2._serialized_end=119
_MESSAGEBYTESEXTRA._serialized_start=121
_MESSAGEBYTESEXTRA._serialized_end=230
# @@protoc_insertion_point(module_scope)

View File

@@ -0,0 +1,34 @@
# 说明
## 解析
```shell
protoc --decode_raw < msg_data.txt
```
## 根据解析结果,设置.proto文件
```shell
1 {
1: 16
2: 0
}
3 {
1: 1
2: "wxid_4b1t09d63spw22"
}
3 {
1: 7
2: "<msgsource>\n\t<alnode>\n\t\t<fr>2</fr>\n\t</alnode>\n\t<sec_msg_node>\n\t\t<uuid>c6680ab2c57499a1a22e44a7eada76e8_</uuid>\n\t</sec_msg_node>\n\t<silence>1</silence>\n\t<membercount>198</membercount>\n\t<signature>v1_Gj7hfmi5</signature>\n\t<tmp_node>\n\t\t<publisher-id></publisher-id>\n\t</tmp_node>\n</msgsource>\n"
}
3 {
1: 2
2: "c13acbc95512d1a59bb686d684fd64d8"
}
3 {
1: 4
2: "yiluoAK_47\\FileStorage\\Cache\\2023-08\\2286b5852db82f6cbd9c2084ccd52358"
}
```
## 生成python文件
```shell
protoc --python_out=. msg.proto
```

View File

@@ -0,0 +1,19 @@
syntax = "proto3";
package app.protobuf;
option go_package=".;proto";
message ChatRoomData {
message ChatRoomMember {
string wxID = 1;
string displayName = 2;
int32 state = 3;
}
repeated ChatRoomMember members = 1;
int32 field_2 = 2;
int32 field_3 = 3;
int32 field_4 = 4;
int32 room_capacity = 5;
int32 field_6 = 6;
int64 field_7 = 7;
int64 field_8 = 8;
}

View File

@@ -0,0 +1,45 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: roomdata.proto
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0eroomdata.proto\x12\x0c\x61pp.protobuf\"\x8b\x02\n\x0c\x43hatRoomData\x12:\n\x07members\x18\x01 \x03(\x0b\x32).app.protobuf.ChatRoomData.ChatRoomMember\x12\x0f\n\x07\x66ield_2\x18\x02 \x01(\x05\x12\x0f\n\x07\x66ield_3\x18\x03 \x01(\x05\x12\x0f\n\x07\x66ield_4\x18\x04 \x01(\x05\x12\x15\n\rroom_capacity\x18\x05 \x01(\x05\x12\x0f\n\x07\x66ield_6\x18\x06 \x01(\x05\x12\x0f\n\x07\x66ield_7\x18\x07 \x01(\x03\x12\x0f\n\x07\x66ield_8\x18\x08 \x01(\x03\x1a\x42\n\x0e\x43hatRoomMember\x12\x0c\n\x04wxID\x18\x01 \x01(\t\x12\x13\n\x0b\x64isplayName\x18\x02 \x01(\t\x12\r\n\x05state\x18\x03 \x01(\x05\x62\x06proto3')
_CHATROOMDATA = DESCRIPTOR.message_types_by_name['ChatRoomData']
_CHATROOMDATA_CHATROOMMEMBER = _CHATROOMDATA.nested_types_by_name['ChatRoomMember']
ChatRoomData = _reflection.GeneratedProtocolMessageType('ChatRoomData', (_message.Message,), {
'ChatRoomMember' : _reflection.GeneratedProtocolMessageType('ChatRoomMember', (_message.Message,), {
'DESCRIPTOR' : _CHATROOMDATA_CHATROOMMEMBER,
'__module__' : 'roomdata_pb2'
# @@protoc_insertion_point(class_scope:app.protobuf.ChatRoomData.ChatRoomMember)
})
,
'DESCRIPTOR' : _CHATROOMDATA,
'__module__' : 'roomdata_pb2'
# @@protoc_insertion_point(class_scope:app.protobuf.ChatRoomData)
})
_sym_db.RegisterMessage(ChatRoomData)
_sym_db.RegisterMessage(ChatRoomData.ChatRoomMember)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_CHATROOMDATA._serialized_start=33
_CHATROOMDATA._serialized_end=300
_CHATROOMDATA_CHATROOMMEMBER._serialized_start=234
_CHATROOMDATA_CHATROOMMEMBER._serialized_end=300
# @@protoc_insertion_point(module_scope)

View File

@@ -0,0 +1,361 @@
# 中国省份拼音到中文的映射字典
province_mapping = {
'Anhui': '安徽',
'Beijing': '北京',
'Chongqing': '重庆',
'Fujian': '福建',
'Gansu': '甘肃',
'Guangdong': '广东',
'Guangxi': '广西',
'Guizhou': '贵州',
'Hainan': '海南',
'Hebei': '河北',
'Heilongjiang': '黑龙江',
'Henan': '河南',
'Hong Kong': '香港',
'Hubei': '湖北',
'Hunan': '湖南',
'Inner Mongolia': '内蒙古',
'Jiangsu': '江苏',
'Jiangxi': '江西',
'Jilin': '吉林',
'Liaoning': '辽宁',
'Macau': '澳门',
'Ningxia': '宁夏',
'Qinghai': '青海',
'Shaanxi': '陕西',
'Shandong': '山东',
'Shanghai': '上海',
'Shanxi': '山西',
'Sichuan': '四川',
'Taiwan': '台湾',
'Tianjin': '天津',
'Tibet': '西藏',
'Xinjiang': '新疆',
'Yunnan': '云南',
'Zhejiang': '浙江',
'Taipei': '台北',
}
country_mapping = {
'CN': '中国大陆',
'TW': '中国台湾',
'GB': "英国",
}
city_mapping = {
"Beijing": "北京",
"Tianjin": "天津",
"Shanghai": "上海",
"Chongqing": "重庆",
"Yinchuan": "银川",
"Shizuishan": "石嘴山",
"Wuzhong": "吴忠",
"Guyuan": "固原",
"Zhongwei": "中卫",
"Wulumuqi": "乌鲁木齐",
"Kelamayi": "克拉玛依",
"Lasa": "拉萨",
"Huhehaote": "呼和浩特",
"Baotou": "包头",
"Wuhai": "乌海",
"Chifeng": "赤峰",
"Tongliao": "通辽",
"Eerduosi": "鄂尔多斯",
"Hulunbeier": "呼伦贝尔",
"Bayannaoer": "巴彦淖尔",
"Wulanchabu": "乌兰察布",
"Nanning": "南宁",
"Liuzhou": "柳州",
"Guilin": "桂林",
"Wuzhou": "梧州",
"Beihai": "北海",
"Chongzuo": "崇左",
"Laibin": "来宾",
"Hezhou": "贺州",
"Yulin": "玉林",
"Baise": "百色",
"Hechi": "河池",
"Qinzhou": "钦州",
"Fangchenggang": "防城港",
"Guigang": "贵港",
"Harbin": "哈尔滨",
"Daqing": "大庆",
"Qiqihaer": "齐齐哈尔",
"Jiamusi": "佳木斯",
"Jixi": "鸡西",
"Hegang": "鹤岗",
"Shuangyashan": "双鸭山",
"Mudanjiang": "牡丹江",
"Yichun": "伊春",
"Qitaihe": "七台河",
"Heihe": "黑河",
"Suihua": "绥化",
"Changchun": "长春",
"Jilin": "吉林",
"Siping": "四平",
"Liaoyuan": "辽源",
"Tonghua": "通化",
"Baishan": "白山",
"Songyuan": "松原",
"Baicheng": "白城",
"Shenyang": "沈阳",
"Dalian": "大连",
"Anshan": "鞍山",
"Fushun": "抚顺",
"Benxi": "本溪",
"Dandong": "丹东",
"Jinzhou": "锦州",
"Yingkou": "营口",
"Fuxin": "阜新",
"Liaoyang": "辽阳",
"Panjin": "盘锦",
"Tieling": "铁岭",
"Chaoyang": "朝阳",
"Huludao": "葫芦岛",
"Shijiazhuang": "石家庄",
"Tangshan": "唐山",
"Handan": "邯郸",
"Qinghuangdao": "秦皇岛",
"Baoding": "保定",
"Zhangjiakou": "张家口",
"Chengde": "承德",
"Langfang": "廊坊",
"Cangzhou": "沧州",
"Hengshui": "衡水",
"Xingtai": "邢台",
"Jinan": "济南",
"Qingdao": "青岛",
"Zibo": "淄博",
"Zaozhuang": "枣庄",
"Dongying": "东营",
"Yantai": "烟台",
"Weifang": "潍坊",
"Jining": "济宁",
"Taian": "泰安",
"Weihai": "威海",
"Rizhao": "日照",
"Laiwu": "莱芜",
"Linyi": "临沂",
"Dezhou": "德州",
"Liaocheng": "聊城",
"Heze": "菏泽",
"Binzhou": "滨州",
"Nanjing": "南京",
"Zhenjiang": "镇江",
"Changzhou": "常州",
"Wuxi": "无锡",
"Suzhou": "苏州",
"Xuzhou": "徐州",
"Lianyungang": "连云港",
"Huaian": "淮安",
"Yancheng": "盐城",
"Yangzhou": "扬州",
"Taizhou": "泰州",
"Nantong": "南通",
"Suqian": "宿迁",
"Hefei": "合肥",
"Bengbu": "蚌埠",
"Wuhu": "芜湖",
"Huainan": "淮南",
"Bozhou": "亳州",
"Fuyang": "阜阳",
"Huaibei": "淮北",
"Suzhou": "宿州",
"Chuzhou": "滁州",
"Anqing": "安庆",
"Chaohu": "巢湖",
"Maanshan": "马鞍山",
"Xuancheng": "宣城",
"Huangshan": "黄山",
"Chizhou": "池州",
"Tongling": "铜陵",
"Hangzhou": "杭州",
"Jiaxing": "嘉兴",
"Huzhou": "湖州",
"Ningbo": "宁波",
"Jinhua": "金华",
"Wenzhou": "温州",
"Lishui": "丽水",
"Shaoxing": "绍兴",
"Quzhou": "衢州",
"Zhoushan": "舟山",
"Taizhou": "台州",
"Fuzhou": "福州",
"Xiamen": "厦门",
"Quanzhou": "泉州",
"Sanming": "三明",
"Nanping": "南平",
"Zhangzhou": "漳州",
"Putian": "莆田",
"Ningde": "宁德",
"Longyan": "龙岩",
"Guangzhou": "广州",
"Shenzhen": "深圳",
"Shantou": "汕头",
"Huizhou": "惠州",
"Zhuhai": "珠海",
"Jieyang": "揭阳",
"Foshan": "佛山",
"Heyuan": "河源",
"Yangjiang": "阳江",
"Maoming": "茂名",
"Zhanjiang": "湛江",
"Meizhou": "梅州",
"Zhaoqing": "肇庆",
"Shaoguan": "韶关",
"Chaozhou": "潮州",
"Dongguan": "东莞",
"Zhongshan": "中山",
"Qingyuan": "清远",
"Jiangmen": "江门",
"Shanwei": "汕尾",
"Yunfu": "云浮",
"Haikou": "海口",
"Sanya": "三亚",
"Kunming": "昆明",
"Qujing": "曲靖",
"Yuxi": "玉溪",
"Baoshan": "保山",
"Zhaotong": "昭通",
"Lijiang": "丽江",
"Puer": "普洱",
"Lincang": "临沧",
"Guiyang": "贵阳",
"Liupanshui": "六盘水",
"Zunyi": "遵义",
"Anshun": "安顺",
"Chengdu": "成都",
"Mianyang": "绵阳",
"Deyang": "德阳",
"Guangyuan": "广元",
"Zigong": "自贡",
"Panzhihua": "攀枝花",
"Leshan": "乐山",
"Nanchong": "南充",
"Neijiang": "内江",
"Suining": "遂宁",
"Guangan": "广安",
"Luzhou": "泸州",
"Dazhou": "达州",
"Meishan": "眉山",
"Yibin": "宜宾",
"Yaan": "雅安",
"Ziyang": "资阳",
"Changsha": "长沙",
"Zhuzhou": "株洲",
"Xiangtan": "湘潭",
"Hengyang": "衡阳",
"Yueyang": "岳阳",
"Chenzhou": "郴州",
"Yongzhou": "永州",
"Shaoyang": "邵阳",
"Huaihua": "怀化",
"Changde": "常德",
"Yiyang": "益阳",
"Zhangjiajie": "张家界",
"Loudi": "娄底",
"Wuhan": "武汉",
"Xiangfan": "襄樊",
"Yichang": "宜昌",
"Huangshi": "黄石",
"Ezhou": "鄂州",
"Suizhou": "随州",
"Jingzhou": "荆州",
"Jingmen": "荆门",
"Shiyan": "十堰",
"Xiaogan": "孝感",
"Huanggang": "黄冈",
"Xianning": "咸宁",
"Zhengzhou": "郑州",
"Luoyang": "洛阳",
"Kaifeng": "开封",
"Luohe": "漯河",
"Anyang": "安阳",
"Xinxiang": "新乡",
"Zhoukou": "周口",
"Sanmenxia": "三门峡",
"Jiaozuo": "焦作",
"Pingdingshan": "平顶山",
"Xinyang": "信阳",
"Nanyang": "南阳",
"Hebi": "鹤壁",
"Puyang": "濮阳",
"Xuchang": "许昌",
"Shangqiu": "商丘",
"Zhumadian": "驻马店",
"Taiyuan": "太原",
"DaTong": "大同",
"Xinzhou": "忻州",
"Yangquan": "阳泉",
"Changzhi": "长治",
"Jincheng": "晋城",
"Shuozhou": "朔州",
"Jinzhong": "晋中",
"Yuncheng": "运城",
"Linfen": "临汾",
"Lvliang": "吕梁",
"Xi'an": "西安",
"Xianyang": "咸阳",
"Tongchuan": "铜川",
"Yanan": "延安",
"Baoji": "宝鸡",
"Weinan": "渭南",
"Hanzhoung": "汉中",
"Ankang": "安康",
"Shangluo": "商洛",
"Yulin": "榆林",
"Lanzhou": "兰州",
"Tianshui": "天水",
"Pingliang": "平凉",
"Jiuquan": "酒泉",
"Jiayuguan": "嘉峪关",
"Jinchang": "金昌",
"baiyiin": "白银",
"Wuwei": "武威",
"Zhangye": "张掖",
"Qingyang": "庆阳",
"Dingxi": "定西",
"Longnan": "陇南",
"Xining": "西宁",
"Nanchang": "南昌",
"Jiujiang": "九江",
"Ganzhou": "赣州",
"Jian": "吉安",
"Yingtan": "鹰潭",
"Shangrao": "上饶",
"Pingxiang": "萍乡",
"Jingdezhen": "景德镇",
"Xinyu": "新余",
"Yichun": "宜春",
"Fuzhou": "抚州",
"Tin Shui": "天水"
}
def conversion_province_to_chinese(province):
area = ''
if province in province_mapping:
area = f'{province_mapping[province]}'
return area
def conversion_region_to_chinese(region: tuple):
area = ''
if not region:
return area
if region[2]:
if region[2] in city_mapping:
area = city_mapping[region[2]]
else:
area = region[2]
if region[1]:
if region[1] in province_mapping:
area = f'{province_mapping[region[1]]} {area}'
else:
area = f'{region[1]} {area}'
if region[0]:
if region[0] in country_mapping:
area = f'{country_mapping[region[0]]} {area}'
else:
area = f'{region[0]} {area}'
return area

14
app/util/search.py Normal file
View File

@@ -0,0 +1,14 @@
from typing import List
from fuzzywuzzy import process
def search_by_content(key, choices: List[List]):
result = []
for i, choice in enumerate(choices):
res = process.extractOne(key, choice)
result.append((res, i))
result.sort(key=lambda x: x[0][1], reverse=True)
k = result[0][1]
item = result[0][0][0]
return choices[k].index(item)