最新工作很忙,需要做一个整体项目和版本排期的大表。幸亏之前学了一点Python,尝试用Python处理数学,生成所需要表格。主要用到了pandas、xlsxwriter等工具包。代码并不具备什么通用性,可能写的也很啰嗦,不过好在也算用Python为自己做了一项实用的工作。下面代码中,一些工作信息隐去了。
from datetime import datetime
import openpyxl
import pandas as pd
file_path = "./resources/系统版本计划统筹表.xlsx"
input_sheet_name = "3-项目版本统筹"
out_file_path = "./resources/"
out_sheet_name = "版本排布-总览"
out_sheet_name_trans_withoutxc = "**"
col_name_product_trade = "**"
col_name_product_clear = "**"
col_name_product_delivery = "**"
col_name_product_supervise = "**"
col_name_product_member = "**"
col_name_product_otc = "**"
col_name_product_data = "**"
col_name_product_risk_management = "**"
col_name_product_other = "**"
col_name_product_line = "**"
col_name_index = "**"
col_name_project_name = "**"
col_name_project_simple_name = "**"
col_name_project_approval_state = "**"
col_name_project_stage = "**"
col_name_if_xc = "**"
col_name_expected_time = "**"
col_name_time_line = "**"
col_name_online_time = "**"
col_name_if_decision = "**"
col_name_explanation = "**"
col_name_system_trade = "**"
col_name_system_clear = "**"
col_name_system_clear_old = "**"
col_name_system_delivery = "**"
col_name_system_supervise = "**"
col_name_system_otc = "**"
col_name_system_member = "**"
col_name_system_index = "**"
col_name_system_dwr = "**"
col_name_system_risk_management = "**"
col_name_system_data = "**"
col_name_system_data_old = "**"
col_name_system_website = "**"
col_name_system_deliveryapp = "**"
col_name_system_bean2 = "**"
col_name_new_project_count = "**"
# 读取Excel
def read_excel(file_path, sheet_name):
workbook = openpyxl.load_workbook(filename=file_path, data_only=True)
return pd.read_excel(workbook, sheet_name=sheet_name, engine='openpyxl')
# 定义输出表格
def create_out_frame():
columns = [col_name_product_trade, col_name_product_clear, col_name_product_delivery, col_name_product_supervise,
col_name_product_member, col_name_product_otc, col_name_product_data,
col_name_product_other, col_name_time_line, col_name_online_time, col_name_system_trade,
col_name_system_clear, col_name_system_clear_old, col_name_system_delivery, col_name_system_supervise,
col_name_system_otc, col_name_system_member, col_name_system_index, col_name_system_dwr,
col_name_system_risk_management, col_name_system_data, col_name_system_data_old, col_name_system_website,
col_name_system_deliveryapp, col_name_system_bean2
]
result = pd.DataFrame(columns=columns)
return result
def fill_the_frame(source_df, result_frame):
# 1.对读入数据按照上线时间排序
source_df.sort_values(by=col_name_online_time, inplace=True)
# 2.插入上线时间列
result_frame[col_name_online_time] = pd.to_datetime(source_df[col_name_online_time],
errors='coerce').dt.strftime('%m/%d')
result_frame[col_name_time_line] = pd.to_datetime(source_df[col_name_online_time],
errors='coerce').dt.strftime('%Y-%m')
# 3.插入版本号列
result_frame[col_name_system_trade] = source_df[col_name_system_trade]
result_frame[col_name_system_clear] = source_df[col_name_system_clear]
result_frame[col_name_system_clear_old] = source_df[col_name_system_clear_old]
result_frame[col_name_system_delivery] = source_df[col_name_system_delivery]
result_frame[col_name_system_supervise] = source_df[col_name_system_supervise]
result_frame[col_name_system_otc] = source_df[col_name_system_otc]
result_frame[col_name_system_member] = source_df[col_name_system_member]
result_frame[col_name_system_index] = source_df[col_name_system_index]
result_frame[col_name_system_dwr] = source_df[col_name_system_dwr]
result_frame[col_name_system_risk_management] = source_df[col_name_system_risk_management]
result_frame[col_name_system_data] = source_df[col_name_system_data]
result_frame[col_name_system_data_old] = source_df[col_name_system_data_old]
result_frame[col_name_system_website] = source_df[col_name_system_website]
result_frame[col_name_system_deliveryapp] = source_df[col_name_system_deliveryapp]
result_frame[col_name_system_bean2] = source_df[col_name_system_bean2]
result_frame[col_name_project_approval_state] = source_df[col_name_project_approval_state]
# 4.填充项目列
for row in source_df.itertuples():
cur_product_line_name = row.产品线
result_frame.at[(row.Index,), cur_product_line_name] = row.简称
# 按行合并
def merge_row(row_num, format, df, sheet):
time_line_row = df.iloc[row_num]
start_cell_index = 1
end_cell_index = 1
last_cell_val = None
total_count = len(time_line_row)
for cell in time_line_row:
# 刚合并过
if start_cell_index == end_cell_index:
if end_cell_index == total_count:
# 处理最后一个单元格
sheet.write(row_num, end_cell_index, cell, format)
else:
end_cell_index += 1
last_cell_val = cell
continue
# 如果值相同,索引+1
if cell == last_cell_val:
# 处理最后一个单元格
if end_cell_index == total_count:
sheet.merge_range(row_num, start_cell_index + 1, row_num, end_cell_index + 1,
cell,
format)
else:
end_cell_index += 1
else:
# 单一值单元格
if end_cell_index - start_cell_index <= 1:
sheet.write(row_num, start_cell_index + 1, last_cell_val, format)
if end_cell_index == total_count:
sheet.write(row_num, end_cell_index + 1, cell, format)
else:
sheet.merge_range(row_num, start_cell_index + 1, row_num, end_cell_index,
last_cell_val,
format)
start_cell_index = end_cell_index
end_cell_index += 1
last_cell_val = cell
def write_result(writer, result_frame):
result_frame.dropna(axis=0, how="all", inplace=True)
result_frame.dropna(axis=1, how="all", inplace=True)
# 总览
_trans_to_sheet(result_frame, writer, out_sheet_name, True)
# 交易
trade_frame = result_frame.drop(axis=1,
columns=[col_name_product_clear, col_name_product_delivery,
col_name_product_supervise,
col_name_product_member, col_name_product_data, col_name_product_otc,
col_name_product_other])
trade_frame.dropna(inplace=True, how="any", subset=[col_name_product_trade])
_trans_to_sheet(trade_frame, writer, "1-交易", False)
# 清算
trade_frame = result_frame.drop(axis=1,
columns=[col_name_product_trade, col_name_product_delivery,
col_name_product_supervise,
col_name_product_member, col_name_product_data, col_name_product_otc,
col_name_product_other])
trade_frame.dropna(inplace=True, how="any", subset=[col_name_product_clear])
_trans_to_sheet(trade_frame, writer, "2-清算", False)
# 交割
trade_frame = result_frame.drop(axis=1,
columns=[col_name_product_trade, col_name_product_clear,
col_name_product_supervise,
col_name_product_member, col_name_product_data, col_name_product_otc,
col_name_product_other])
trade_frame.dropna(inplace=True, how="any", subset=[col_name_product_delivery])
_trans_to_sheet(trade_frame, writer, "3-交割", False)
# 监查
trade_frame = result_frame.drop(axis=1,
columns=[col_name_product_trade, col_name_product_clear,
col_name_product_delivery,
col_name_product_member, col_name_product_data, col_name_product_otc,
col_name_product_other])
trade_frame.dropna(inplace=True, how="any", subset=[col_name_product_supervise])
_trans_to_sheet(trade_frame, writer, "4-监查", False)
# 场外
trade_frame = result_frame.drop(axis=1,
columns=[col_name_product_trade, col_name_product_clear,
col_name_product_delivery,
col_name_product_member, col_name_product_data, col_name_product_supervise,
col_name_product_other])
trade_frame.dropna(inplace=True, how="any", subset=[col_name_product_otc])
_trans_to_sheet(trade_frame, writer, "5-场外", False)
# 会服
trade_frame = result_frame.drop(axis=1,
columns=[col_name_product_trade, col_name_product_clear,
col_name_product_delivery,
col_name_product_otc, col_name_product_data, col_name_product_supervise,
col_name_product_other])
trade_frame.dropna(inplace=True, how="any", subset=[col_name_product_member])
_trans_to_sheet(trade_frame, writer, "6-会服", False)
# 数据
trade_frame = result_frame.drop(axis=1,
columns=[col_name_product_trade, col_name_product_clear,
col_name_product_delivery,
col_name_product_otc, col_name_product_member, col_name_product_supervise,
col_name_product_other])
trade_frame.dropna(inplace=True, how="any", subset=[col_name_product_data])
_trans_to_sheet(trade_frame, writer, "7-数据", False)
# 其他
trade_frame = result_frame.drop(axis=1,
columns=[col_name_product_trade, col_name_product_clear,
col_name_product_delivery,
col_name_product_otc, col_name_product_member, col_name_product_supervise,
col_name_product_data])
trade_frame.dropna(inplace=True, how="any", subset=[col_name_product_other])
_trans_to_sheet(trade_frame, writer, "8-其他", False)
def _trans_to_sheet(df, writer, sheet_name, is_all):
final_result_frame = df.drop(axis=1, columns=[col_name_project_approval_state])
ori_trans_frame = df.transpose(copy=True)
ori_trans_frame.set_index(ori_trans_frame.index, append=True, inplace=True)
# 5.转置输出
result_frame_trans = final_result_frame.transpose(copy=True)
# 加一列索引列
result_frame_trans.set_index(result_frame_trans.index, append=True, inplace=True)
# 6.处理时间线列-合并单元格
result_frame_trans.to_excel(writer, sheet_name=sheet_name, header=False)
out_sheet = writer.sheets[sheet_name]
sheet_table = out_sheet.table
row_counts = len(sheet_table)
# 交替设置行样式
odd_row_format = writer.book.add_format({
"border": 1,
"fg_color": "D9D9D9",
"border_color": "808080",
"font_name": "微软雅黑",
"bold": True,
"align": "center",
"valign": "vcenter",
})
even_row_format = writer.book.add_format({
"border": 1,
"fg_color": "F2F2F2",
"border_color": "808080",
"font_name": "微软雅黑",
"bold": True,
"align": "center",
"valign": "vcenter",
})
project_cell_format = writer.book.add_format({
"border": 1,
"fg_color": "4F81BD",
"border_color": "808080",
"font_name": "Times New Roman",
"font_color": "white",
"bold": True,
"align": "center",
"valign": "vcenter",
"text_wrap": True
})
new_project_cell_format = writer.book.add_format({
"border": 1,
"fg_color": "00B050",
"border_color": "808080",
"font_name": "Times New Roman",
"font_color": "white",
"bold": True,
"align": "center",
"valign": "vcenter",
"text_wrap": True
})
version_cell_format = writer.book.add_format({
"border": 1,
"fg_color": "BFBFBF",
"border_color": "808080",
"font_name": "Times New Roman",
"font_color": "black",
"bold": True,
"align": "center",
"valign": "vcenter",
"text_wrap": True
})
merge_cell_format = writer.book.add_format({
"bold": True,
"border": 2,
"align": "center",
"valign": "vcenter",
"fg_color": "1F497D",
"font_color": "white",
"font_name": "Times New Roman",
"border_color": "808080"
})
time_line_row_num = 0
online_time_row_num = 0
if is_all:
time_line_row_num = 8
online_time_row_num = 9
else:
time_line_row_num = 1
online_time_row_num = 2
for row_index in range(row_counts):
table_row = sheet_table[row_index]
cur_cell_format = None
if row_index % 2 == 0:
out_sheet.set_row(row=row_index, height=30, cell_format=even_row_format)
else:
out_sheet.set_row(row=row_index, height=30, cell_format=odd_row_format)
if time_line_row_num <= row_index <= online_time_row_num:
cur_cell_format = merge_cell_format
elif row_index > online_time_row_num:
cur_cell_format = version_cell_format
for col_idx in table_row:
if col_idx == 0:
continue
if col_idx == 1:
value = ori_trans_frame.index[row_index][0]
cur_idx_format = None
if row_index % 2 == 0:
cur_idx_format = even_row_format
else:
cur_idx_format = odd_row_format
out_sheet.write(row_index, col_idx, value, cur_idx_format)
else:
if row_index < time_line_row_num:
cur_cell_format = project_cell_format
if_new_project = ori_trans_frame.iloc[row_counts, col_idx - 2]
if if_new_project == "待立项":
cur_cell_format = new_project_cell_format
value = ori_trans_frame.iloc[row_index, col_idx - 2]
out_sheet.write(row_index, col_idx, value, cur_cell_format)
# 合并项目、系统和时间轴
if is_all:
out_sheet.merge_range(0, 0, time_line_row_num - 1, 0,
"项目",
merge_cell_format)
else:
out_sheet.write(0, 0, "项目", merge_cell_format)
out_sheet.merge_range(time_line_row_num, 0, online_time_row_num, 1,
"时间轴",
merge_cell_format)
out_sheet.merge_range(online_time_row_num + 1, 0, row_counts - 1, 0,
"系统",
merge_cell_format)
merge_row(time_line_row_num, format=merge_cell_format, sheet=out_sheet, df=result_frame_trans)
merge_row(online_time_row_num, format=merge_cell_format, sheet=out_sheet, df=result_frame_trans)
# 7.设置样式
# 列宽
out_sheet.set_column(0, 0, width=5)
out_sheet.set_column(1, 1, width=20)
def write_detail_table(source_df, writer):
# 定义输出的DataFrame
result_frame = create_out_frame()
# 遍历填写数据1
fill_the_frame(source_df, result_frame)
# 输出结果
write_result(writer, result_frame)
def run():
currentDateAndTime = datetime.now()
time_str = currentDateAndTime.strftime("%Y%m%d(%H%M)")
out_file = out_file_path + time_str + "版本排布.xlsx"
writer = pd.ExcelWriter(out_file,
engine='xlsxwriter',
date_format='yyyy-dd-mm')
# 读取源数据到DataFrame
source_df = read_excel(file_path, input_sheet_name)
# 输出明细数据
write_detail_table(source_df, writer)
# 输出统计数据
# write_statistic_tables(source_df, writer)
writer.close()
run()