Files
inventory_check/inventory_check/views.py
2025-02-25 18:52:49 +08:00

175 lines
5.5 KiB
Python

# -*- coding: utf-8 -*-
"""
:author: luffmims
:url: http://yum2.cc
:copyright: © 2025 luffmims <luffmims@hotmaill.com>
:license: MIT, see LICENSE for more details.
"""
from flask import jsonify, redirect, request, url_for, render_template, send_file
from flask import make_response, session
from io import BytesIO
from openpyxl import Workbook
from inventory_check import app, db
from inventory_check.models import Inventory, import_from_xlsx
from werkzeug.exceptions import BadRequest
@app.route('/')
def index():
return render_template('index.html')
@app.route('/list')
def list():
return render_template('list.html')
@app.route('/check', methods=['GET'])
def check():
inventory = Inventory.query.filter_by(checked=False).first()
print(inventory)
if not inventory:
return "All checked"
return render_template('check.html',inventory=inventory)
@app.route('/get_inventory', methods=['POST'])
def update_inventory():
try:
# 获取并验证表单数据
data = request.get_json()
inventory_id = data['id']
amount = data['amount']
name = data['name']
if not inventory_id or not amount:
raise BadRequest('Missing inventory id or amount')
# 查询并更新库存对象
inventory = Inventory.query.get(inventory_id)
if not inventory:
raise BadRequest('Inventory not found')
checkedone = inventory.tag
inventory.amount = amount
inventory.name = name
inventory.checked = True
# 提交更改
db.session.commit()
# 显示成功消息
# flash(f'{inventory.tag} checked and updated')
id = inventory_id + 1
inventory = Inventory.query.get(id)
if not inventory:
raise BadRequest('Inventory not found')
# 返回更新后的库存信息
return jsonify({
'id': inventory.id,
'tag': inventory.tag,
'amount': inventory.amount,
'checkedone': checkedone,
'checkedamount': amount
})
except BadRequest as e:
# 处理错误情况
# flash(str(e))
return jsonify({'error': str(e)}), 400
except Exception as e:
# 处理其他可能的错误
db.session.rollback()
# flash('An error occurred while updating the inventory')
return jsonify({'error': 'Internal server error'}), 500
@app.route('/upload', methods=['GET', 'POST'])
def upload_form():
if request.method == 'POST':
# 文件上传逻辑
pass
return render_template('upload.html')
@app.route('/import', methods=['POST'])
def import_file():
if request.method == 'POST':
file = request.files['file']
if file:
file_path = 'data.xlsx'
file.save(file_path)
import_from_xlsx(file_path)
return '文件导入成功! <a href="/">返回</a>'
return redirect(url_for('upload_form'))
@app.route('/download-xlsx')
def download_xlsx():
# 创建一个 Excel 工作簿
inventorys = Inventory.query.all() # 查询所有数据, and sort data by sort
# 创建一个工作簿
wb = Workbook()
# 创建一个工作表
ws = wb.active
# 设置工作表名称
ws.title = "Inventory"
# 设置工作表列名
ws.append(["sort", "location", "tag", "amount", "checked", "name"])
# 写入数据
for inventory in inventorys:
ws.append([inventory.sort, inventory.location, inventory.tag, inventory.amount, inventory.checked, inventory.name])
output = BytesIO()
wb.save(output)
output.seek(0) # 移动到字节流的开头
# 设置响应头,提供下载功能
response = make_response(send_file(output, mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', as_attachment=True, download_name='example.xlsx'))
response.headers["Content-Disposition"] = "attachment; filename=example.xlsx"
return response
# 应用凭证信息
APP_ID = "cli_a70da4f71bf1d00c"
APP_SECRET = "4zV9xcunWUE8E4kKS4hw5e3m6pgjoduz"
# 获取 app_access_token
def get_app_access_token():
url = "https://open.feishu.cn/open-apis/auth/v3/app_access_token/internal/"
payload = {
"app_id": APP_ID,
"app_secret": APP_SECRET
}
response = request.post(url, json=payload)
return response.json().get("app_access_token")
# 获取 user_access_token
def get_user_access_token(app_access_token, code):
url = "https://open.feishu.cn/open-apis/authen/v1/access_token"
headers = {"Authorization": f"Bearer {app_access_token}"}
payload = {"grant_type": "authorization_code", "code": code}
response = request.post(url, headers=headers, json=payload)
return response.json().get("data", {}).get("access_token")
# 获取用户信息
def get_user_info(user_access_token):
url = "https://open.feishu.cn/open-apis/authen/v1/user_info"
headers = {"Authorization": f"Bearer {user_access_token}"}
response = request.get(url, headers=headers)
return response.json().get("data")
@app.route("/callback", methods=["GET"])
def callback():
code = request.args.get("code")
if not code:
return jsonify({"error": "Missing code parameter"}), 400
# 获取 app_access_token
app_access_token = get_app_access_token()
# 获取 user_access_token
user_access_token = get_user_access_token(app_access_token, code)
# 获取用户信息
user_info = get_user_info(user_access_token)
# 存储用户信息到 session
session["user_info"] = user_info
return jsonify(user_info)