Python处理Excel和MySQL数据

. . .

mysql数据的处理和excel数据的读

比如mysql数据的处理和excel数据的读(写在最下方), 如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
# encoding=utf-8

import pandas as pd
import pymysql
import math
import traceback


class DB:
def __init__(self, host='localhost', port=3306, db='', user='root', passwd='root', charset='utf8'):
# 建立连接
self.conn = pymysql.connect(host=host, port=port, db=db, user=user, passwd=passwd, charset=charset)
# 创建游标,操作设置为字典类型
self.cur = self.conn.cursor()
# self.cur = self.conn.cursor(cursor=pymysql.cursors.DictCursor)

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
# 关闭游标
self.cur.close()
# 关闭数据库连接
self.conn.close()

def get_label_id(self, label_name):
# SQL 查询语句
sql = "SELECT * FROM label WHERE name = '%s'" % label_name
try:
# 执行SQL语句
self.cur.execute(sql)
# 获取所有记录列表
results = self.cur.fetchall()
for row in results:
_id = row[0]
# lname = row[1]
# age = row[2]
# sex = row[3]
# income = row[4]
# 打印结果
# print("fname=%s,lname=%s,age=%s,sex=%s,income=%s" % \
# (fname, lname, age, sex, income))
return _id
except:
print("Error: unable to fetch data, label_name=%s" % label_name)
return None

def get_label_dimension_id(self, label_dimension_name):
# SQL 查询语句
sql = "SELECT * FROM label_dimension WHERE name = '%s'" % label_dimension_name
try:
# 执行SQL语句
self.cur.execute(sql)
# 获取所有记录列表
results = self.cur.fetchall()
for row in results:
_id = row[0]
return _id
except:
print("Error: unable to fetch data, label_dimension_name=%s" % label_dimension_name)
return None

def iter_all_material_id_by_game_id(self, game_id=1827):
# SQL 查询语句
sql = "SELECT * FROM material WHERE game_id = %d" % game_id
try:
# 执行SQL语句
self.cur.execute(sql)
# 获取所有记录列表
results = self.cur.fetchall()
for row in results:
_id = row[0]
yield _id
except:
print("Error: unable to fetch data, game_id=%s" % game_id)
return None

def clear_label_by_material_id(self, material_id):
# SQL 查询语句
sql = "DELETE FROM label_material_rel WHERE material_id = %d" % material_id
try:
# 执行SQL语句
print(sql)
self.cur.execute(sql)
# 向数据库提交
# self.conn.commit()
except:
# 发生错误时回滚
# self.conn.rollback()
print("Error: clear_label_by_material_id material_id=%d" % material_id)

def get_label_name_2_id_map(self):
# SQL 查询语句
sql = "SELECT * FROM label"
ret_map = {}
try:
# 执行SQL语句
self.cur.execute(sql)
# 获取所有记录列表
results = self.cur.fetchall()
for row in results:
_id = row[0]
_name = row[1]
ret_map[_name] = _id

except:
# 发生错误时回滚
# self.conn.rollback()
print("Error: get_label_id_2_name_map")
return ret_map

def get_label_dimension_name_2_id_map(self, game_id=1827):
# SQL 查询语句
sql = "SELECT * FROM label_dimension where game_id = %d" % game_id
ret_map = {}
try:
# 执行SQL语句
self.cur.execute(sql)
# 获取所有记录列表
results = self.cur.fetchall()
for row in results:
_id = row[0]
_name = row[1]
ret_map[_name] = _id

except:
# 发生错误时回滚
# self.conn.rollback()
print("Error: get_label_id_2_name_map")
return ret_map

def update_label_material_rel(self, label_id, material_id):
# SQL 插入语句
sql = """INSERT INTO label_material_rel (label_id, material_id)
VALUES (%d, %d)""" % (label_id, material_id)
try:
print(sql)
# 执行sql语句
self.cur.execute(sql)
# 提交到数据库执行
# self.conn.commit()
except:
# 如果发生错误则回滚
# self.conn.rollback()
print("Error: update_label_material_rel (label_id=%d, material_id=%d)" % (label_id, material_id))


if __name__ == '__main__':
with DB(host='42.186.102.210', port=9110, user='fba_user', db='fba', passwd='fba.edt') as db:
# with DB(host='42.186.102.211', port=9110, user='fba_user', db='fba', passwd='fba.edt.dvp') as db:
df = pd.read_excel('pending_proc.xlsx'
# , sheet_name='汇总'
)
col_list = df.columns.to_list()
exclude_col_name_list = ['视频编码', '提交时间', '负责人']
for e in exclude_col_name_list:
col_list.remove(e)

# label_dimension_id_list = []
# for col in col_list:
# _label_dimension_id = db.get_label_dimension_id(col)
# label_dimension_id_list.append(_label_dimension_id)

pending_import_data = []
# _material_id_list = set()
for i in df.index.values: # 获取行号的索引,并对其进行遍历:
# 根据i来获取每一行指定的数据 并利用to_dict转成字典
# break
_row_data = df.ix[i, col_list].to_dict()
# _material_id_list.add(_row_data['id'])
pending_import_data.append(_row_data)
# if i > 10:
# break

# print("最终获取到的数据test_data是:{0}".format(test_data))
# print("最终获取到的数据_material_id_list是:{0}".format(_material_id_list))
#
try:
for m_id in db.iter_all_material_id_by_game_id():
db.clear_label_by_material_id(m_id)

_label_name_2_id_map = db.get_label_name_2_id_map()
for _row_data in pending_import_data:
if math.isnan(_row_data['id']):
continue
_material_id = int(_row_data['id'])
for col_name, label_name_list_str in _row_data.items():
if col_name == 'id':
continue
_label_list = str(label_name_list_str).strip().split(',')
for _label in _label_list:
_label_id = _label_name_2_id_map.get(_label, None)
if _label_id is None:
continue
db.update_label_material_rel(_label_id, _material_id)

# 提交数据库并执行
db.conn.commit()
except Exception as e:
traceback.print_exc()
print(e)
db.conn.rollback()

excel数据的写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import pandas as pd
# import numpy as np

pending_write_data = \
[[0.01197952 0.49298068 0.76897227 0.76737911]
[0.82765185 0.70592612 0.07624629 0.41206483]
[0.96358469 0.14713271 0.34609823 0.45701923]
[0.87425111 0.18451819 0.55125788 0.50195541]]

frame = pd.DataFrame(
pending_write_data, # 待写入的每一行的行数据
# index=['exp1','exp2','exp3','exp4'], # 索引名字, 注释了索引就是默认的`0, 1, 2, ..`
columns=['jan2015','Fab2015','Mar2015','Apr2005']) # 列名字
print(frame)
frame.to_excel("data2.xlsx",
index=False, # False则为不需要索引了
)
# print(np.random.random((4,4)))

requirements.txt的自动生成

使用pigar, pip install pigar

requirements.txt
1
2
3
4
5
6
7
8
9
10
11
# Automatically generated by https://github.com/damnever/pigar.

# E:\working\doc\sdc\import_label_material.py: 4
PyMySQL == 0.10.1

# E:\working\doc\sdc\test.py: 2
numpy == 1.19.1

# E:\working\doc\sdc\import_label_material.py: 3
# E:\working\doc\sdc\test.py: 1
pandas == 0.25.1