email邮件解析作为比较基础的模块,用来收取邮件、发送邮件。python的mail模块调用几行代码就能写一个发送/接受邮件的脚本。但是如果要做到持续稳定,能够上生产环境的代码,还是需要下一番功夫,解决编码和内容异常的问题。可能遇到的问题如下:
邮件编码问题
邮件日期格式解析
多附件的下载
邮件如何增量解析?
一、连接邮件服务器 首先,将邮件的账户密码配置化:
1 2 3 4 5 6 7 8 # config.py MAIL = { "mail_host": "smtp.exmail.qq.com", # SMTP服务器 "mail_user": "xxx@abc.com", # 用户名 "mail_pwd": "fdaxxx", # 登录密码 "sender": "xxx@abc.com", # 发件人邮箱 "port":465 # SSL默认是465 }
创建邮件连接,获取邮件列表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 from config.py import MAIL # 连接到腾讯企业邮箱,其他邮箱调整括号里的参数 conn = imaplib.IMAP4_SSL(MAIL['mail_host'], MAIL['port']) conn.login(MAIL['mail_user'], MAIL['mail_pwd']) # 选定一个邮件文件夹 conn.select("INBOX") # 获取收件箱 # 提取了文件夹中所有邮件的编号 resp, mails = conn.search(None, 'ALL') # 提取了指定编号,按最新时间倒序 mails_list = mails[0].split() mails_list = list(reversed(mails_list)) mail_nums = len(mails_list) for i in range(mail_nums): print("mail: {}/{}".format(i+1, mail_nums)) resp, data = conn.fetch(mails_list[i], '(RFC822)') emailbody = data[0][1] mail = email.message_from_bytes(emailbody)
二.、邮件编码问题 邮件主题中是一般是可以获取到邮件编码的,但也有获取不准的时候,这时就会报错。这需要做编码兼容性处理。 decode_data()函数优先采用邮件内容获取的编码,如果解析不成功,就依次用UTF-8,GBK,GB2312编码来解析。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 # 获取邮件自带的编码 from email.header import decode_header mail_encode = decode_header(mail.get("Subject"))[0][1] mail_title = decode_data(decode_header(mail.get("Subject"))[0][0], mail_encode) def decode_data(bytes, added_encode=None): """ 字节解码 :param bytes: :return: """ def _decode(bytes, encoding): try: return str(bytes, encoding=encoding) except Exception as e: return None encodes = ['UTF-8', 'GBK', 'GB2312'] if added_encode: encodes = [added_encode] + encodes for encoding in encodes: str_data = _decode(bytes, encoding) if str_data is not None: return str_data return None
三、邮件日期格式解析 邮件日期的格式一般是Mon, 8 Jun 2020 22:02:41 +0800
这样的,也有8 Jun 2020 22:02:41 +0800
,去掉了星期。 要做到兼容,我只需要解析中间的年月日时分秒。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 from datetime import datetime def parse_mail_time(mail_datetime): """ 邮件时间解析 :param bytes: :return: """ print(mail_datetime) GMT_FORMAT = "%a, %d %b %Y %H:%M:%S" GMT_FORMAT2 = "%d %b %Y %H:%M:%S" index = mail_datetime.find(' +0') if index > 0: mail_datetime = mail_datetime[:index] # 去掉+0800 formats = [GMT_FORMAT, GMT_FORMAT2] for ft in formats: try: mail_datetime = datetime.strptime(mail_datetime, ft) return mail_datetime except: pass raise Exception("邮件时间格式解析错误")
四、邮件增量解析 我们定义邮件的表结构如下:
1 2 3 4 5 6 7 8 9 10 CREATE TABLE `mail_record_history` ( `receive_time` datetime NOT NULL COMMENT '邮件接收时间', `title` varchar(200) NOT NULL COMMENT '邮件标题', `mail_from` varchar(100) DEFAULT NULL, `content` text COMMENT '邮件内容', `attachment` varchar(400) DEFAULT NULL COMMENT '邮件附件文件', `parse_time` datetime DEFAULT NULL COMMENT '解析时间', `status` int(11) DEFAULT NULL COMMENT '状态:-1:失败,0:正常; -2: 文件大小为0', PRIMARY KEY (`receive_time`,`title`) )
mail_record_history表的每条记录对应一份邮件,邮件接受时间和邮件标题作为主键。 通过表字段receive_time的最大值来作为增量解析邮件的标准是有缺陷的。 python的mail模块接口没找到指定日期后的邮件,每次都是取全量的邮件序号,从最新的邮件开始解析,如果程序一切顺利(几乎不可能),那是没有问题的。 但是,只有出现一次错误,有可能是网络超时,有可能是邮件服务器不响应,有可能是解析服务器故障,就会出现从最新日期到数据库邮件最大日期之间丢失邮件。 而且下次再触发邮件解析时无法从中断处连续。 这里,我们用redis来存储最大邮件解析的时间点。
1 2 3 4 5 6 7 8 9 10 11 12 13 REDIS_PARAMS = { 'host': "192、168.1.111", 'port': 6379, 'password': 'xxxxx', 'db': 14, } def get_redis_client(): r = redis.Redis(host=REDIS_PARAMS['host'], port=REDIS_PARAMS['port'], password=REDIS_PARAMS['password'], db=REDIS_PARAMS['db']) return r redis_client = get_redis_client() REDIS_KEY = "max_mail_recieve_time"
每次解析先获取数据库中最新的邮件时间
1 2 3 4 5 6 7 8 9 10 11 12 13 def get_max_mail_recieve_time(): """ 获取数据库最新邮件时间 :return: """ max_receive_time = redis_client.get(REDIS_KEY) if max_receive_time is None or max_receive_time == 'None': max_receive_time = "2020-01-01 00:00:00" # redis_client.set(REDIS_KEY, max_receive_time) if isinstance(max_receive_time, bytes): max_receive_time = str(max_receive_time, encoding='utf-8') return max_receive_time
从最新邮件开始解析,当邮件时间小于数据库最新时间时,就终止解析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import arrow max_recieve_time = get_max_mail_recieve_time() max_mail_time_str = None for i in range(mail_nums): print("mail: {}/{}".format(i+1, mail_nums)) resp, data = conn.fetch(mails_list[i], '(RFC822)') emailbody = data[0][1] mail = email.message_from_bytes(emailbody) mail_datetime = parse_mail_time(mail.get("date")) if arrow.get(mail_datetime) < arrow.get(max_recieve_time): return if i == 0: max_mail_time_str = arrow.get(mail_datetime).format("YYYY-MM-DD HH:mm")
当所有邮件都解析成功时,才更新redis的数据库最新时间(REDIS_KEY)。
1 2 if max_mail_time_str: redis_client.set(REDIS_KEY, max_mail_time_str)
五、邮件正文解析 1 2 3 4 5 6 7 8 mail_body = decode_data(get_body(mail)) # 解析邮件内容 def get_body(msg): if msg.is_multipart(): return get_body(msg.get_payload(0)) else: return msg.get_payload(None,decode=True)
六、邮件附件下载 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 MAIL_DIR = '/tmp' mail_date_str = '2020-06-09' # 获取邮件附件 fileNames = [] for part in mail.walk(): fileName = part.get_filename() # 如果文件名为纯数字、字母时不需要解码,否则需要解码 try: fileName = decode_header(fileName)[0][0].decode(decode_header(fileName)[0][1]) except: pass # 如果获取到了文件,则将文件保存在制定的目录下 if fileName: dirPath = os.path.join(MAIL_DIR, mail_date_str) os.system("chmod -R 777 {}".format(dirPath)) if not os.path.exists(dirPath): os.makedirs(dirPath) filePath = os.path.join(dirPath, fileName) try: if not os.path.isfile(filePath): fp = open(filePath, 'wb') fp.write(part.get_payload(decode=True)) fp.close() print("附件下载成功,文件名为:" + fileName) else: print("附件已经存在,文件名为:" + fileName) except Exception as e: print(e)