注:本文所用数据均来自百度开源数据
Admin_Log
数据会在下方展示下载按钮,请自行提取。
本文中使用Python – 面向对象 – 数据分析 – 每日销售额项目中的data_define.py与file_define.py进行复写。
本文需要用到pymysql模块,请自行在PyCharm中添加。
项目介绍:将文件内的数据写入到MySQL中,后续将写入MySQL中的数据读取并保存到本地。
Admin_log
实现步骤:
1. 设计一个类,可以完成数据的封装
2. 设计一个抽象类,定义文件读取的相关功能,并使用子类实现具体操作
3. 读取文件,生产数据对象
4. 将数据写入到MySQL中
5. 将MySQL中的数据读取并保存到本地
代码模块分析:
data_define.py:数据定义封装类
file_define.py:文件定义封装类
main.py:主逻辑代码[ 数据写入MySQL ]
read_write.py:读取MySQL数据将数据写出到本地
写入数据成品展示:
写出数据成品展示:
data_define.py:
""" 数据定义的类 """ class Record: def __init__(self, data, order_id, money, province): self.date = data # 订单日期 self.order_id = order_id # 订单ID self.money = money # 订单金额 self.province = province # 销售省份 def __str__(self): return f"{self.date}, {self.order_id}, {self.money}, {self.province}"
file_define.py:
""" 和文件相关的类定义 """ import json # 先定义一个抽象类,用来做顶层设计,确定有哪些功能需要实现 from data_define import Record class FileReader: def read_data(self) -> list[Record]: """读取文件数据,读到的每一条数据都转换为Record对象,将它们都封装到list内返回即可""" pass class TextFileReader(FileReader): def __init__(self, path): self.path = path # 定义成员变量,记录文件的路径 # 复写(实现抽象方法)父类的方法 def read_data(self) -> list[Record]: f = open(self.path, "r", encoding="UTF-8") record_list: list[Record] = [] for line in f.readlines(): # print(line) line = line.strip() # 消除读取到的每一行数据中的"\n" # print(line) data_list = line.split(",") # print(data_list) record = Record(data_list[0], data_list[1], int(data_list[2]), data_list[3]) record_list.append(record) f.close() return record_list class JsonFileReader(FileReader): def __init__(self, path): self.path = path # 定义成员变量,记录文件的路径 def read_data(self) -> list[Record]: f = open(self.path, "r", encoding="UTF-8") record_list: list[Record] = [] for line in f.readlines(): data_dict = json.loads(line) record = Record(data_dict["date"], data_dict["order_id"], int(data_dict["money"]), data_dict["province"]) record_list.append(record) # print(record_list) f.close() return record_list if __name__ == '__main__': text_file_reader = TextFileReader("E:/PycharmProjects/面向对象数据/2011年1月销售数据.txt") json_file_reader = JsonFileReader("E:/PycharmProjects/面向对象数据/2011年2月销售数据JSON.txt") list1 = text_file_reader.read_data() list2 = json_file_reader.read_data() for l in list1: print(l) # print("==========") for l in list2: print(l)
main.py:
""" SQL综合案例,读取文件,写入到MySQL数据库中 """ import pymysql from file_define import FileReader, TextFileReader, JsonFileReader from data_define import Record # from pyecharts.charts import Bar # from pyecharts.options import * # from pyecharts.globals import ThemeType from pymysql import Connection text_file_reader = TextFileReader("E:/PycharmProjects/面向对象数据/2011年1月销售数据.txt") json_file_reader = JsonFileReader("E:/PycharmProjects/面向对象数据/2011年2月销售数据JSON.txt") jan_data: list[Record] = text_file_reader.read_data() feb_data: list[Record] = json_file_reader.read_data() # 将两个月份的数据合并为一个list存储 all_data: list[Record] = jan_data + feb_data # print(all_data) # 构建MySQL链接对象 conn = Connection( host='localhost', # 主机名(IP) port=3306, # 端口 user='root', # 账户(用户名) password='123456', # 密码 autocommit=True # 自动提交(确认)(事务) ) # 获得游标对象 cursor = conn.cursor() # 选择数据库 conn.select_db('py_sql') # 组织SQL语句 for record in all_data: sql = f"insert into orders(order_date, order_id, money, province) "\ f"values('{record.date}', '{record.order_id}', {record.money}, '{record.province}')" # print(sql) cursor.execute(sql) # 执行SQL语句 # 关闭MySQL链接对象 conn.close()
read_write.py:
import json from pymysql import Connection conn = Connection( host="localhost", port=3306, user="root", password="123456", ) cursor = conn.cursor() conn.select_db("py_sql") cursor.execute("select * from orders") data_tuple = cursor.fetchall() f = open("E:\PycharmProjects\Python操作MySQL基础使用\读取写出操作.txt", "a", encoding="UTF-8") data_dict = {} for record in data_tuple: data_dict["data"] = str(record[0]) data_dict["order_id"] = record[1] data_dict["money"] = int(record[2]) data_dict["province"] = record[3] f.write(json.dumps(data_dict, ensure_ascii=False)) f.write("\n") f.close() conn.close()