马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
好的,我会为你提供一个利用 Flask、pg8000 和 Pandas 构建的后台基本框架,用于手机理财产品 App 的报表分析接口。这个框架将包含异常处置惩罚、模块化的布局以支持多人协作,以及交易分析和收益分析的示例接口。
项目布局:
- financial_report_app/
- ├── app/
- │ ├── __init__.py # 初始化 Flask 应用和蓝图
- │ ├── config.py # 配置文件 (数据库连接等)
- │ ├── database.py # 数据库连接和操作辅助函数
- │ ├── errors.py # 统一的错误处理
- │ ├── models/ # (可选) 如果使用 ORM,可以在这里定义模型
- │ │ └── __init__.py
- │ ├── routes/ # API 路由定义
- │ │ ├── __init__.py
- │ │ ├── transaction_routes.py # 交易分析相关路由
- │ │ └── earning_routes.py # 收益分析相关路由
- │ ├── services/ # 业务逻辑层
- │ │ ├── __init__.py
- │ │ ├── transaction_service.py # 交易数据分析服务
- │ │ └── earning_service.py # 收益数据分析服务
- │ └── utils/ # 工具函数
- │ ├── __init__.py
- │ └── dataframe_utils.py # Pandas DataFrame 处理工具
- ├── run.py # 应用启动脚本
- ├── requirements.txt # 项目依赖
- └── README.md # 项目说明
复制代码 如今,我们来逐步创建这些文件和代码。
1. requirements.txt
- Flask>=2.3.0
- pg8000>=1.29.0
- pandas>=1.5.0
- Flask-Cors>=4.0.0 # 用于处理跨域请求,方便前端调试
- python-dotenv>=1.0.0 # 用于从 .env 文件加载环境变量
复制代码 2. .env (在 financial_report_app 目录下创建这个文件,不要提交到版本库)
- FLASK_APP=run.py
- FLASK_ENV=development # 可以是 development 或 production
- FLASK_DEBUG=True
- DB_USER=your_db_user
- DB_PASSWORD=your_db_password
- DB_HOST=localhost
- DB_PORT=5432
- DB_NAME=your_financial_db
复制代码 请务必将 your_db_user, your_db_password, your_financial_db 替换为你现实的 PostgreSQL 数据库信息。
3. app/config.py
- import os
- from dotenv import load_dotenv
- # 加载 .env 文件中的环境变量
- load_dotenv()
- class Config:
- """基本配置类"""
- SECRET_KEY = os.environ.get('SECRET_KEY') or 'a_very_secret_key_that_you_should_change'
- DEBUG = False
- TESTING = False
- # 数据库配置
- DB_USER = os.environ.get('DB_USER')
- DB_PASSWORD = os.environ.get('DB_PASSWORD')
- DB_HOST = os.environ.get('DB_HOST')
- DB_PORT = int(os.environ.get('DB_PORT', 5432))
- DB_NAME = os.environ.get('DB_NAME')
- if not all([DB_USER, DB_PASSWORD, DB_HOST, DB_NAME]):
- raise ValueError("数据库配置不完整,请检查 .env 文件。")
- class DevelopmentConfig(Config):
- """开发环境配置"""
- DEBUG = True
- FLASK_DEBUG = True # 确保 Flask 的调试模式也开启
- class TestingConfig(Config):
- """测试环境配置"""
- TESTING = True
- # 可以为测试环境配置一个单独的数据库
- DB_NAME = os.environ.get('TEST_DB_NAME') or 'your_financial_test_db'
- class ProductionConfig(Config):
- """生产环境配置"""
- DEBUG = False
- FLASK_DEBUG = False
- # 生产环境中应使用更强的 SECRET_KEY,并可能从环境变量或密钥管理服务加载
- # 根据 FLASK_ENV 环境变量选择配置
- config_by_name = dict(
- development=DevelopmentConfig,
- testing=TestingConfig,
- production=ProductionConfig,
- default=DevelopmentConfig
- )
- def get_config():
- env = os.getenv('FLASK_ENV', 'default')
- return config_by_name.get(env, DevelopmentConfig)
- current_config = get_config()
复制代码 4. app/database.py
- import pg8000.native
- import pandas as pd
- from flask import current_app, g # g 是 Flask 的应用上下文全局变量
- def get_db_connection():
- """获取数据库连接"""
- if 'db_conn' not in g:
- try:
- g.db_conn = pg8000.native.Connection(
- user=current_app.config['DB_USER'],
- password=current_app.config['DB_PASSWORD'],
- host=current_app.config['DB_HOST'],
- port=current_app.config['DB_PORT'],
- database=current_app.config['DB_NAME']
- )
- current_app.logger.info("数据库连接成功。")
- except Exception as e:
- current_app.logger.error(f"数据库连接失败: {e}")
- raise # 重新抛出异常,以便上层捕获
- return g.db_conn
- def close_db_connection(e=None):
- """关闭数据库连接"""
- db_conn = g.pop('db_conn', None)
- if db_conn is not None:
- db_conn.close()
- current_app.logger.info("数据库连接已关闭。")
- def query_db(query, args=None, fetch_one=False, commit=False):
- """
- 执行数据库查询。
- :param query: SQL 查询语句
- :param args: 查询参数 (元组)
- :param fetch_one: 是否只获取一条记录
- :param commit: 是否提交事务 (用于 INSERT, UPDATE, DELETE)
- :return: 查询结果 (列表或单个元组) 或 None
- """
- conn = None
- cursor = None
- try:
- conn = get_db_connection()
- cursor = conn.cursor()
- current_app.logger.debug(f"执行 SQL: {query} 参数: {args}")
- cursor.execute(query, args if args else [])
- if commit:
- conn.commit()
- current_app.logger.info("事务已提交。")
- return cursor.rowcount # 返回影响的行数
- if fetch_one:
- result = cursor.fetchone()
- return result
- else:
- results = cursor.fetchall()
- return results
- except pg8000.Error as e:
- current_app.logger.error(f"数据库查询错误: {e}")
- if conn and commit: # 如果是提交操作且出错,则回滚
- conn.rollback()
- current_app.logger.info("事务已回滚。")
- raise # 重新抛出,由全局错误处理器处理
- except Exception as e:
- current_app.logger.error(f"执行数据库操作时发生未知错误: {e}")
- raise
- finally:
- if cursor:
- cursor.close()
- # 注意:连接的关闭由 app_context_teardown 统一处理
- def query_db_to_dataframe(query, args=None, columns=None):
- """
- 执行数据库查询并将结果转换为 Pandas DataFrame。
- :param query: SQL 查询语句
- :param args: 查询参数 (元组)
- :param columns: (可选) DataFrame 的列名列表。如果为 None,则使用 cursor.description。
- :return: Pandas DataFrame 或 空 DataFrame
- """
- conn = None
- cursor = None
- try:
- conn = get_db_connection()
- cursor = conn.cursor()
- current_app.logger.debug(f"执行 SQL (to DataFrame): {query} 参数: {args}")
- cursor.execute(query, args if args else [])
- results = cursor.fetchall()
- if not results:
- return pd.DataFrame()
- if columns is None:
- columns = [desc[0] for desc in cursor.description]
- return pd.DataFrame(results, columns=columns)
- except pg8000.Error as e:
- current_app.logger.error(f"数据库查询到 DataFrame 错误: {e}")
- raise
- except Exception as e:
- current_app.logger.error(f"转换为 DataFrame 时发生未知错误: {e}")
- raise
- finally:
- if cursor:
- cursor.close()
- def init_app(app):
- """注册数据库关闭函数到应用"""
- app.teardown_appcontext(close_db_connection)
- # 可以在这里添加一个 ping 数据库的命令,用于 CLI
- # app.cli.add_command(init_db_command)
复制代码 5. app/errors.py
- from flask import jsonify, current_app
- import pg8000
- class AppError(Exception):
- """自定义应用基础错误类"""
- status_code = 500
- message = "服务器内部错误"
- def __init__(self, message=None, status_code=None, payload=None):
- super().__init__()
- if message is not None:
- self.message = message
- if status_code is not None:
- self.status_code = status_code
- self.payload = payload
- def to_dict(self):
- rv = dict(self.payload or ())
- rv['error'] = self.message
- rv['status_code'] = self.status_code
- return rv
- class BadRequestError(AppError):
- """请求参数错误"""
- status_code = 400
- message = "请求无效或参数错误"
- class NotFoundError(AppError):
- """资源未找到错误"""
- status_code = 404
- message = "请求的资源未找到"
- class DatabaseError(AppError):
- """数据库操作错误"""
- status_code = 500
- message = "数据库操作失败"
- def register_error_handlers(app):
- """注册全局错误处理器"""
- @app.errorhandler(AppError)
- def handle_app_error(error):
- response = jsonify(error.to_dict())
- response.status_code = error.status_code
- current_app.logger.error(f"AppError: {error.message}, Status: {error.status_code}, Payload: {error.payload}")
- return response
- @app.errorhandler(pg8000.Error)
- def handle_database_error(error):
- # 对 pg8000 的具体错误可以进行更细致的处理和记录
- current_app.logger.error(f"pg8000 Database Error: {error}")
- # 可以根据 error 类型返回更具体的错误信息,但对客户端通常隐藏细节
- app_err = DatabaseError(message="数据库服务暂时不可用或查询出错。")
- response = jsonify(app_err.to_dict())
- response.status_code = app_err.status_code
- return response
- @app.errorhandler(400)
- def handle_bad_request(error):
- # Flask 自带的 400 错误
- app_err = BadRequestError(message=error.description or "错误的请求。")
- response = jsonify(app_err.to_dict())
- response.status_code = app_err.status_code
- current_app.logger.warning(f"BadRequest (400): {app_err.message}")
- return response
- @app.errorhandler(404)
- def handle_not_found(error):
- # Flask 自带的 404 错误
- app_err = NotFoundError(message=error.description or "您请求的页面或资源不存在。")
- response = jsonify(app_err.to_dict())
- response.status_code = app_err.status_code
- current_app.logger.warning(f"NotFound (404): {app_err.message}")
- return response
- @app.errorhandler(Exception)
- def handle_generic_exception(error):
- # 捕获所有未被特定处理器捕获的异常
- current_app.logger.error(f"Unhandled Exception: {error}", exc_info=True)
- # 对于生产环境,避免泄露过多内部错误细节给客户端
- if current_app.config['DEBUG']:
- message = str(error)
- else:
- message = "服务器发生了一个未预料到的错误,请稍后再试。"
- app_err = AppError(message=message, status_code=500)
- response = jsonify(app_err.to_dict())
- response.status_code = app_err.status_code
- return response
复制代码 6. app/utils/__init__.py (可以为空)
7. app/utils/dataframe_utils.py
- import pandas as pd
- from app.errors import BadRequestError
- def validate_request_dates(start_date_str, end_date_str):
- """验证并转换日期字符串"""
- try:
- start_date = pd.to_datetime(start_date_str)
- end_date = pd.to_datetime(end_date_str)
- except ValueError:
- raise BadRequestError("日期格式无效,请使用 YYYY-MM-DD 格式。")
- if start_date > end_date:
- raise BadRequestError("开始日期不能晚于结束日期。")
- return start_date, end_date
- def format_chart_data(df, index_col, value_cols, chart_type="timeseries"):
- """
- 将 DataFrame 格式化为图表库(如 ECharts, Chart.js)常用的格式。
- 这是一个通用示例,具体格式可能需要根据前端图表库调整。
- :param df: Pandas DataFrame
- :param index_col: 作为X轴(通常是日期或类别)的列名
- :param value_cols: 作为Y轴值的列名列表
- :param chart_type: 图表类型,可用于未来扩展
- :return: 字典格式的图表数据
- """
- if df.empty:
- return {
- "categories": [],
- "series": [{"name": col, "data": []} for col in value_cols]
- }
- # 确保索引列是 datetime 类型,并格式化为字符串(如果需要)
- if pd.api.types.is_datetime64_any_dtype(df[index_col]):
- categories = df[index_col].dt.strftime('%Y-%m-%d').tolist()
- else:
- categories = df[index_col].astype(str).tolist()
- series_data = []
- for col in value_cols:
- if col in df.columns:
- series_data.append({
- "name": col,
- "data": df[col].tolist() # 确保数据是 JSON 可序列化的 (例如,不要有 NaN)
- })
- else:
- series_data.append({
- "name": col,
- "data": []
- })
- return {
- "categories": categories, # X 轴数据
- "series": series_data # Y 轴数据系列
- }
- # 更多 Pandas 相关的工具函数可以放在这里
- # 例如:数据清洗、聚合、计算增长率等
- def calculate_daily_summary(df, date_col, value_col, agg_func='sum'):
- """
- 按天汇总数据。
- :param df: DataFrame
- :param date_col: 日期列名
- :param value_col: 需要聚合的值列名
- :param agg_func: 聚合函数 ('sum', 'mean', 'count', etc.)
- :return: 汇总后的 DataFrame,索引为日期
- """
- if df.empty or date_col not in df.columns or value_col not in df.columns:
- return pd.DataFrame(columns=[date_col, value_col]).set_index(date_col)
- df_copy = df.copy()
- df_copy[date_col] = pd.to_datetime(df_copy[date_col])
- summary = df_copy.groupby(pd.Grouper(key=date_col, freq='D'))[value_col].agg(agg_func).reset_index()
- summary = summary.rename(columns={value_col: f'daily_{value_col}_{agg_func}'})
- return summary.set_index(date_col)
- def resample_timeseries_data(df, resample_freq='D', agg_dict=None):
- """
- 对时间序列 DataFrame 进行重采样和聚合。
- DataFrame 的索引必须是 DateTimeIndex。
- :param df: 输入的 DataFrame (DateTimeIndex)
- :param resample_freq: 重采样频率 ('D', 'W', 'M', 'Q', 'Y')
- :param agg_dict: 字典,键为列名,值为聚合函数或函数列表
- 例如: {'amount': 'sum', 'price': 'mean'}
- :return: 重采样和聚合后的 DataFrame
- """
- if not isinstance(df.index, pd.DatetimeIndex):
- raise ValueError("DataFrame的索引必须是DatetimeIndex才能进行重采样。")
- if df.empty:
- return pd.DataFrame()
- if agg_dict is None:
- # 默认对所有数值列求和
- numeric_cols = df.select_dtypes(include=pd.np.number).columns
- agg_dict = {col: 'sum' for col in numeric_cols}
- resampled_df = df.resample(resample_freq).agg(agg_dict)
- return resampled_df
复制代码 8. app/services/__init__.py (可以为空)
9. app/services/transaction_service.py
- import pandas as pd
- from app.database import query_db_to_dataframe
- from app.utils.dataframe_utils import validate_request_dates, format_chart_data, resample_timeseries_data
- from app.errors import NotFoundError, AppError
- from flask import current_app
- # 假设的数据库表名
- TRANSACTIONS_TABLE = "transactions" # 示例表名,请替换为你的实际表名
- PRODUCTS_TABLE = "products" # 示例表名
- class TransactionService:
- def get_transaction_analysis(self, user_id: int, start_date_str: str, end_date_str: str,
- transaction_type: str = None, product_id: int = None,
- group_by: str = 'day'):
- """
- 获取交易分析数据。
- :param user_id: 用户 ID
- :param start_date_str: 开始日期字符串 (YYYY-MM-DD)
- :param end_date_str: 结束日期字符串 (YYYY-MM-DD)
- :param transaction_type: (可选) 交易类型 (e.g., 'buy', 'sell')
- :param product_id: (可选) 产品 ID
- :param group_by: (可选) 分组依据 ('day', 'month', 'product', 'type')
- :return: 格式化后的图表数据
- """
- start_date, end_date = validate_request_dates(start_date_str, end_date_str)
- # 构建基础 SQL 查询
- # ****** 注意:这里的 SQL 查询是示例,你需要根据你的数据库表结构进行修改 ******
- # 常见的交易表字段可能包括:id, user_id, product_id, transaction_date,
- # transaction_type ('buy', 'sell', 'dividend', 'fee'), amount, quantity, price, currency
- sql_query = f"""
- SELECT
- t.transaction_date,
- t.transaction_type,
- t.amount,
- t.currency,
- p.product_name,
- p.product_category
- FROM {TRANSACTIONS_TABLE} t
- LEFT JOIN {PRODUCTS_TABLE} p ON t.product_id = p.product_id
- WHERE t.user_id = %s
- AND t.transaction_date >= %s
- AND t.transaction_date <= %s
- """
- params = [user_id, start_date, end_date]
- if transaction_type:
- sql_query += " AND t.transaction_type = %s"
- params.append(transaction_type)
- if product_id:
- sql_query += " AND t.product_id = %s"
- params.append(product_id)
- current_app.logger.info(f"查询交易数据: user_id={user_id}, start={start_date}, end={end_date}")
- try:
- df = query_db_to_dataframe(sql_query, tuple(params))
- except Exception as e:
- current_app.logger.error(f"获取交易数据时出错: {e}")
- raise AppError("获取交易数据失败")
- if df.empty:
- current_app.logger.info("未找到符合条件的交易数据。")
- # 返回空的图表结构
- return format_chart_data(pd.DataFrame(), 'date', ['value'])
- # -------- Pandas 数据分析示例 --------
- # 确保 'transaction_date' 是 datetime 类型
- df['transaction_date'] = pd.to_datetime(df['transaction_date'])
- df = df.sort_values(by='transaction_date')
- # 根据 group_by 参数进行不同的聚合分析
- chart_data = {}
- if group_by == 'day':
- # 按天汇总交易总额
- daily_summary = df.set_index('transaction_date').resample('D')['amount'].sum().reset_index()
- daily_summary = daily_summary.rename(columns={'transaction_date': 'date', 'amount': 'total_amount'})
- chart_data = format_chart_data(daily_summary, index_col='date', value_cols=['total_amount'])
- chart_data['title'] = "每日交易总额"
- elif group_by == 'month':
- # 按月汇总交易总额
- monthly_summary = df.set_index('transaction_date').resample('M')['amount'].sum().reset_index()
- monthly_summary['transaction_date'] = monthly_summary['transaction_date'].dt.to_period('M').astype(str) # X轴标签
- monthly_summary = monthly_summary.rename(columns={'transaction_date': 'month', 'amount': 'total_amount'})
- chart_data = format_chart_data(monthly_summary, index_col='month', value_cols=['total_amount'])
- chart_data['title'] = "每月交易总额"
- elif group_by == 'product_category':
- # 按产品类别汇总交易额
- category_summary = df.groupby('product_category')['amount'].sum().reset_index()
- category_summary = category_summary.sort_values(by='amount', ascending=False)
- # 这种通常是饼图或柱状图
- chart_data = {
- "title": "各产品类别交易额占比",
- "type": "pie", # 或 "bar"
- "labels": category_summary['product_category'].tolist(),
- "data": category_summary['amount'].tolist()
- }
- # 或者使用 format_chart_data 转换成通用的系列格式
- # chart_data = format_chart_data(category_summary, index_col='product_category', value_cols=['amount'])
- # chart_data['title'] = "各产品类别交易额"
- elif group_by == 'transaction_type':
- # 按交易类型汇总
- type_summary = df.groupby('transaction_type')['amount'].sum().reset_index()
- # 这种通常是饼图或柱状图
- chart_data = {
- "title": "各交易类型金额占比",
- "type": "pie",
- "labels": type_summary['transaction_type'].tolist(),
- "data": type_summary['amount'].tolist()
- }
- else:
- # 默认按天汇总
- daily_summary = df.set_index('transaction_date').resample('D')['amount'].sum().reset_index()
- daily_summary = daily_summary.rename(columns={'transaction_date': 'date', 'amount': 'total_amount'})
- chart_data = format_chart_data(daily_summary, index_col='date', value_cols=['total_amount'])
- chart_data['title'] = "每日交易总额 (默认)"
- current_app.logger.info(f"交易分析完成: group_by={group_by}")
- return chart_data
复制代码 重要提示关于 transaction_service.py 中的 SQL:
- 表名和列名: TRANSACTIONS_TABLE, PRODUCTS_TABLE, t.transaction_date, t.amount, p.product_name, p.product_category 等都是示例。你需要将它们替换为你数据库中现实的表名和列名。
- 数据类型: 确保 transaction_date 在数据库中是 DATE 或 TIMESTAMP 类型,amount 是数值类型 (如 DECIMAL 或 NUMERIC)。
- JOINs: 如果产品信息在不同的表中,你需要利用 JOIN。
10. app/services/earning_service.py
- import pandas as pd
- from app.database import query_db_to_dataframe
- from app.utils.dataframe_utils import validate_request_dates, format_chart_data
- from app.errors import NotFoundError, AppError
- from flask import current_app
- # 假设的数据库表名
- EARNINGS_TABLE = "earnings" # 示例表名,请替换为你的实际表名
- PRODUCTS_TABLE = "products" # 示例表名
- class EarningService:
- def get_earning_analysis(self, user_id: int, start_date_str: str, end_date_str: str,
- product_id: int = None, earning_type: str = None,
- group_by: str = 'day'):
- """
- 获取收益分析数据。
- :param user_id: 用户 ID
- :param start_date_str: 开始日期字符串 (YYYY-MM-DD)
- :param end_date_str: 结束日期字符串 (YYYY-MM-DD)
- :param product_id: (可选) 产品 ID
- :param earning_type: (可选) 收益类型 (e.g., 'interest', 'dividend', 'capital_gain')
- :param group_by: (可选) 分组依据 ('day', 'month', 'product', 'type')
- :return: 格式化后的图表数据
- """
- start_date, end_date = validate_request_dates(start_date_str, end_date_str)
- # ****** 注意:这里的 SQL 查询是示例,你需要根据你的数据库表结构进行修改 ******
- # 常见的收益表字段可能包括:id, user_id, product_id, earning_date,
- # earning_type ('interest', 'dividend', 'realized_gain', 'unrealized_gain'), amount, currency
- sql_query = f"""
- SELECT
- e.earning_date,
- e.earning_type,
- e.amount,
- e.currency,
- p.product_name,
- p.product_category
- FROM {EARNINGS_TABLE} e
- LEFT JOIN {PRODUCTS_TABLE} p ON e.product_id = p.product_id
- WHERE e.user_id = %s
- AND e.earning_date >= %s
- AND e.earning_date <= %s
- """
- params = [user_id, start_date, end_date]
- if product_id:
- sql_query += " AND e.product_id = %s"
- params.append(product_id)
- if earning_type:
- sql_query += " AND e.earning_type = %s"
- params.append(earning_type)
- current_app.logger.info(f"查询收益数据: user_id={user_id}, start={start_date}, end={end_date}")
- try:
- df = query_db_to_dataframe(sql_query, tuple(params))
- except Exception as e:
- current_app.logger.error(f"获取收益数据时出错: {e}")
- raise AppError("获取收益数据失败")
- if df.empty:
- current_app.logger.info("未找到符合条件的收益数据。")
- return format_chart_data(pd.DataFrame(), 'date', ['value'])
- df['earning_date'] = pd.to_datetime(df['earning_date'])
- df = df.sort_values(by='earning_date')
- chart_data = {}
- if group_by == 'day':
- # 按天汇总收益
- daily_summary = df.set_index('earning_date').resample('D')['amount'].sum().reset_index()
- daily_summary = daily_summary.rename(columns={'earning_date': 'date', 'amount': 'total_earnings'})
- chart_data = format_chart_data(daily_summary, index_col='date', value_cols=['total_earnings'])
- chart_data['title'] = "每日总收益"
- elif group_by == 'month':
- # 按月汇总收益
- monthly_summary = df.set_index('earning_date').resample('M')['amount'].sum().reset_index()
- monthly_summary['earning_date'] = monthly_summary['earning_date'].dt.to_period('M').astype(str)
- monthly_summary = monthly_summary.rename(columns={'earning_date': 'month', 'amount': 'total_earnings'})
- chart_data = format_chart_data(monthly_summary, index_col='month', value_cols=['total_earnings'])
- chart_data['title'] = "每月总收益"
- elif group_by == 'product':
- # 按产品汇总收益
- product_summary = df.groupby('product_name')['amount'].sum().reset_index()
- product_summary = product_summary.sort_values(by='amount', ascending=False)
- chart_data = {
- "title": "各产品收益贡献",
- "type": "bar", # or pie
- "labels": product_summary['product_name'].tolist(),
- "data": product_summary['amount'].tolist()
- }
- elif group_by == 'earning_type':
- # 按收益类型汇总
- type_summary = df.groupby('earning_type')['amount'].sum().reset_index()
- chart_data = {
- "title": "各收益类型占比",
- "type": "pie",
- "labels": type_summary['earning_type'].tolist(),
- "data": type_summary['amount'].tolist()
- }
- else:
- daily_summary = df.set_index('earning_date').resample('D')['amount'].sum().reset_index()
- daily_summary = daily_summary.rename(columns={'earning_date': 'date', 'amount': 'total_earnings'})
- chart_data = format_chart_data(daily_summary, index_col='date', value_cols=['total_earnings'])
- chart_data['title'] = "每日总收益 (默认)"
- current_app.logger.info(f"收益分析完成: group_by={group_by}")
- return chart_data
复制代码 重要提示关于 earning_service.py 中的 SQL:
- 表名和列名: EARNINGS_TABLE, e.earning_date, e.amount, e.earning_type 等都是示例。请替换为你的现实表名和列名。
- 收益类型: earning_type 的值(如 'interest', 'dividend')也需要与你数据库中的定义一致。
11. app/routes/__init__.py
- from flask import Blueprint
- # 创建 API 蓝图
- # url_prefix 会给这个蓝图下的所有路由加上 /api 前缀
- api_bp = Blueprint('api', __name__, url_prefix='/api')
- # 导入各个模块的路由,这样在 __init__.py 中创建 app 时可以注册它们
- # 放在这里导入是为了避免循环依赖
- from . import transaction_routes
- from . import earning_routes
- # 健康检查路由,方便确认服务是否运行
- @api_bp.route('/health', methods=['GET'])
- def health_check():
- return {"status": "healthy", "message": "API is running."}, 200
复制代码 12. app/routes/transaction_routes.py
- from flask import request, jsonify, current_app
- from . import api_bp # 从同级 __init__.py 导入蓝图
- from app.services.transaction_service import TransactionService
- from app.errors import BadRequestError
- # from app.auth import token_required # 如果需要认证,取消注释
- @api_bp.route('/transactions/analysis', methods=['POST'])
- # @token_required # 如果需要认证,取消注释并确保 auth.py 已实现
- def transaction_analysis_report():
- """
- 交易分析图表接口
- 请求体 (JSON):
- {
- "user_id": 123,
- "start_date": "2024-01-01",
- "end_date": "2024-12-31",
- "transaction_type": "buy", // 可选
- "product_id": 1, // 可选
- "group_by": "month" // 可选: 'day', 'month', 'product_category', 'transaction_type'
- }
- """
- data = request.get_json()
- if not data:
- raise BadRequestError("请求体不能为空且必须是 JSON 格式。")
- user_id = data.get('user_id')
- start_date = data.get('start_date')
- end_date = data.get('end_date')
- if not all([user_id, start_date, end_date]):
- raise BadRequestError("缺少必要的参数: user_id, start_date, end_date。")
- try:
- user_id = int(user_id)
- except ValueError:
- raise BadRequestError("user_id 必须是整数。")
- transaction_type = data.get('transaction_type')
- product_id_str = data.get('product_id')
- product_id = None
- if product_id_str:
- try:
- product_id = int(product_id_str)
- except ValueError:
- raise BadRequestError("product_id (如果提供) 必须是整数。")
- group_by = data.get('group_by', 'day') # 默认为 'day'
- service = TransactionService()
- try:
- current_app.logger.info(f"请求交易分析: user_id={user_id}, start={start_date}, end={end_date}, group_by={group_by}")
- chart_data = service.get_transaction_analysis(
- user_id=user_id,
- start_date_str=start_date,
- end_date_str=end_date,
- transaction_type=transaction_type,
- product_id=product_id,
- group_by=group_by
- )
- return jsonify({"status": "success", "data": chart_data}), 200
- except BadRequestError as e:
- raise e # 直接抛出,由全局处理器处理
- except Exception as e:
- current_app.logger.error(f"处理交易分析请求时发生错误: {e}", exc_info=True)
- # 对于未预料的错误,返回一个通用的 AppError
- from app.errors import AppError
- raise AppError("生成交易分析报告时发生内部错误。")
复制代码 13. app/routes/earning_routes.py
- from flask import request, jsonify, current_app
- from . import api_bp
- from app.services.earning_service import EarningService
- from app.errors import BadRequestError
- # from app.auth import token_required
- @api_bp.route('/earnings/analysis', methods=['POST'])
- # @token_required
- def earning_analysis_report():
- """
- 收益分析图表接口
- 请求体 (JSON):
- {
- "user_id": 123,
- "start_date": "2024-01-01",
- "end_date": "2024-12-31",
- "product_id": 1, // 可选
- "earning_type": "dividend", // 可选
- "group_by": "month" // 可选: 'day', 'month', 'product', 'earning_type'
- }
- """
- data = request.get_json()
- if not data:
- raise BadRequestError("请求体不能为空且必须是 JSON 格式。")
- user_id = data.get('user_id')
- start_date = data.get('start_date')
- end_date = data.get('end_date')
- if not all([user_id, start_date, end_date]):
- raise BadRequestError("缺少必要的参数: user_id, start_date, end_date。")
- try:
- user_id = int(user_id)
- except ValueError:
- raise BadRequestError("user_id 必须是整数。")
- product_id_str = data.get('product_id')
- product_id = None
- if product_id_str:
- try:
- product_id = int(product_id_str)
- except ValueError:
- raise BadRequestError("product_id (如果提供) 必须是整数。")
- earning_type = data.get('earning_type')
- group_by = data.get('group_by', 'day')
- service = EarningService()
- try:
- current_app.logger.info(f"请求收益分析: user_id={user_id}, start={start_date}, end={end_date}, group_by={group_by}")
- chart_data = service.get_earning_analysis(
- user_id=user_id,
- start_date_str=start_date,
- end_date_str=end_date,
- product_id=product_id,
- earning_type=earning_type,
- group_by=group_by
- )
- return jsonify({"status": "success", "data": chart_data}), 200
- except BadRequestError as e:
- raise e
- except Exception as e:
- current_app.logger.error(f"处理收益分析请求时发生错误: {e}", exc_info=True)
- from app.errors import AppError
- raise AppError("生成收益分析报告时发生内部错误。")
复制代码 14. app/__init__.py
- import logging
- from flask import Flask
- from flask_cors import CORS
- from app.config import current_config, get_config
- from app.routes import api_bp # 从 app.routes 包导入蓝图
- from app.errors import register_error_handlers
- from app.database import init_app as init_db_app
- def create_app(config_name=None):
- """应用工厂函数"""
- app = Flask(__name__)
- # 1. 加载配置
- if config_name:
- app_config = get_config(config_name)
- else:
- app_config = current_config # 使用 .env 中 FLASK_ENV 决定的配置
- app.config.from_object(app_config)
- # 2. 配置日志
- # 如果不是调试模式,可以配置更详细的日志记录到文件
- if not app.debug and not app.testing:
- # 可以配置 logging.FileHandler() 等
- pass
- else: # 开发和测试时,使用 Flask 的默认 logger,输出到控制台
- app.logger.setLevel(logging.DEBUG) # 确保 DEBUG 级别的日志能被看到
- app.logger.info(f"应用以 '{app.config.get('ENV', 'unknown environment')}'模式启动。")
- app.logger.info(f"调试模式: {'开启' if app.debug else '关闭'}")
- # 3. 初始化扩展
- CORS(app, resources={r"/api/*": {"origins": "*"}}) # 允许所有来源访问 /api/* 路径,生产环境应配置具体来源
- init_db_app(app) # 初始化数据库相关,主要是注册 teardown context
- # 4. 注册蓝图
- app.register_blueprint(api_bp) # api_bp 已经自带了 /api 前缀
- # 5. 注册错误处理器
- register_error_handlers(app)
- # 6. (可选) 添加 CLI 命令
- # from .commands import create_tables_command
- # app.cli.add_command(create_tables_command)
- app.logger.info("Flask 应用创建完成。")
- return app
复制代码 15. run.py (在 financial_report_app 目录下)
- import os
- from app import create_app
- # 从环境变量获取配置名称,默认为 'development'
- # FLASK_ENV 环境变量通常由 .flaskenv 或 .env 文件设置,或者在启动时直接指定
- config_name = os.getenv('FLASK_ENV') or 'development'
- app = create_app(config_name)
- if __name__ == '__main__':
- # host='0.0.0.0' 使其可以从外部访问 (如果防火墙允许)
- # debug=True 会启用 Flask 的调试器和重载器,通常由 FLASK_DEBUG 环境变量控制
- # port 可以自定义
- app.run(host='0.0.0.0', port=5000)
复制代码 16. README.md (在 financial_report_app 目录下)
Markdown
- # 手机理财产品 App 后台 - 报表分析接口
- 本项目是一个基于 Flask、pg8000 和 Pandas 的 Python 后台服务,为手机理财 App 提供报表分析接口。
- ## 项目结构
复制代码 financial_report_app/ ├── app/ │ ├── init.py │ ├── config.py │ ├── database.py │ ├── errors.py │ ├── models/ │ ├── routes/ │ │ ├── init.py │ │ ├── transaction_routes.py │ │ └── earning_routes.py │ ├── services/ │ │ ├── init.py │ │ ├── transaction_service.py │ │ └── earning_service.py │ └── utils/ │ ├── init.py │ └── dataframe_utils.py ├── .env.example # 环境变量示例文件 ├── .flaskenv # (可选) Flask CLI 环境变量 ├── run.py ├── requirements.txt └── README.md
- ## 环境搭建与运行
- ### 1. 先决条件
- - Python 3.8+
- - PostgreSQL 数据库
- ### 2. 克隆项目
- ```bash
- git clone <your-repository-url>
- cd financial_report_app
复制代码 3. 创建并激活假造环境
Bash
- python -m venv venv
- # Windows
- venv\Scripts\activate
- # macOS/Linux
- source venv/bin/activate
复制代码 4. 安装依靠
- pip install -r requirements.txt
复制代码 5. 配置环境变量
复制 .env.example (如果提供了) 或手动创建一个 .env 文件,并填入以下内容:
代码段
- FLASK_APP=run.py
- FLASK_ENV=development # 'development' 或 'production'
- FLASK_DEBUG=True # 'True' 或 'False'
- DB_USER=your_postgres_user
- DB_PASSWORD=your_postgres_password
- DB_HOST=localhost
- DB_PORT=5432
- DB_NAME=your_financial_database_name
- SECRET_KEY=your_very_secret_and_unique_key # 请务必修改此项
复制代码 重要:
- 将 your_postgres_user, your_postgres_password, your_financial_database_name 替换为你现实的 PostgreSQL 数据库凭据和数据库名。
- 确保 SECRET_KEY 是一个复杂且唯一的字符串。
6. 数据库准备
- 确保你的 PostgreSQL 服务正在运行。
- 手动毗连到你的 PostgreSQL 数据库,并创建必要的表。 本项目中的示例 SQL 查询假设存在以下表(你需要根据现实情况调整):
- transactions (交易记载表)
- user_id (INT)
- product_id (INT)
- transaction_date (TIMESTAMP or DATE)
- transaction_type (VARCHAR, e.g., 'buy', 'sell')
- amount (DECIMAL or NUMERIC)
- currency (VARCHAR)
- ... (其他相干字段)
- earnings (收益记载表)
- user_id (INT)
- product_id (INT)
- earning_date (TIMESTAMP or DATE)
- earning_type (VARCHAR, e.g., 'dividend', 'interest')
- amount (DECIMAL or NUMERIC)
- currency (VARCHAR)
- ... (其他相干字段)
- products (产品信息表)
- product_id (INT, PRIMARY KEY)
- product_name (VARCHAR)
- product_category (VARCHAR)
- ... (其他相干字段)
你需要根据 app/services/transaction_service.py 和 app/services/earning_service.py 中的 SQL 查询来创建或调整你的表布局。
7. 运行应用
- flask run
- # 或者直接运行 run.py (如果 .env 中配置了 FLASK_APP=run.py)
- # python run.py
复制代码 应用默认会在 http://127.0.0.1:5000/ (或 http://0.0.0.0:5000/) 启动。
API 接口
全部 API 接口都以 /api 为前缀。
健康查抄
- URL: /api/health
- Method: GET
- Success Response: JSON
- {
- "status": "healthy",
- "message": "API is running."
- }
复制代码 交易分析接口
- URL: /api/transactions/analysis
- Method: POST
- Request Body (JSON): JSON
- {
- "user_id": 123,
- "start_date": "2024-01-01", // YYYY-MM-DD
- "end_date": "2024-12-31", // YYYY-MM-DD
- "transaction_type": "buy", // 可选
- "product_id": 1, // 可选
- "group_by": "month" // 可选, e.g., "day", "month", "product_category", "transaction_type"
- }
复制代码 - Success Response (Example for group_by: 'day'): JSON
- {
- "status": "success",
- "data": {
- "title": "每日交易总额",
- "categories": ["2024-01-01", "2024-01-02", "..."], // X轴
- "series": [ // Y轴系列
- {
- "name": "total_amount",
- "data": [1500.50, 230.00, "..."]
- }
- ]
- }
- }
复制代码 留意: data 布局会根据 group_by 参数和现实分析结果变化。
收益分析接口
- URL: /api/earnings/analysis
- Method: POST
- Request Body (JSON): JSON
- {
- "user_id": 123,
- "start_date": "2024-01-01",
- "end_date": "2024-12-31",
- "product_id": 1, // 可选
- "earning_type": "dividend", // 可选
- "group_by": "month" // 可选, e.g., "day", "month", "product", "earning_type"
- }
复制代码 - Success Response (Example for group_by: 'month'): JSON
- {
- "status": "success",
- "data": {
- "title": "每月总收益",
- "categories": ["2024-01", "2024-02", "..."],
- "series": [
- {
- "name": "total_earnings",
- "data": [50.20, 75.10, "..."]
- }
- ]
- }
- }
复制代码 错误处置惩罚
API 会返回尺度的 HTTP 状态码和 JSON 格式的错误信息:
JSON
- {
- "error": "错误描述信息",
- "status_code": 400 // 或其他 HTTP 状态码
- }
复制代码 多人协作
- 模块化计划: 项目按功能(配置、路由、服务、工具)划分模块,方便分工。
- Service 层: 业务逻辑集中在 app/services,与路由处置惩罚分离。
- 数据库抽象: app/database.py 提供统一的数据库访问方法。
- 统一错误处置惩罚: app/errors.py 定义和处置惩罚应用级错误。
- 环境变量: 利用 .env 管理敏感配置,不提交到版本库。
- 代码风格: 建议利用 flake8 或 black 等工具统一代码风格。
- 版本控制: 利用 Git 进行版本控制,遵循合适的分支计谋(如 Gitflow)。
待办/未来改进
- [ ] 添加用户认证和授权 (e.g., JWT)
- [ ] 更完善的输入验证 (e.g., using Marshmallow or Pydantic)
- [ ] 数据库迁移管理 (e.g., Alembic)
- [ ] 单元测试和集成测试
- [ ] 更详细的日记配置和监控
- [ ] 异步任务处置惩罚 (e.g., Celery)
- [ ] ORM 集成 (e.g., SQLAlchemy),如果业务逻辑更复杂
- [ ] API 文档生成 (e.g., Swagger/OpenAPI with Flask-RESTx or Flasgger)
<!-- end list -->
- ---**怎样运行:**1. **创建项目文件夹**: `mkdir financial_report_app && cd financial_report_app`2. **创建子目录**: 按照上面的项目布局创建 `app`, `app/routes`, `app/services`, `app/utils`, `app/models` (如果需要) 目录。3. **生存文件**: 将上面提供的每个代码块生存到对应的文件中。4. **创建 `.env` 文件**: 在 `financial_report_app` 目录下创建 `.env` 文件,并填入你的数据库配置。5. **安装依靠**: ```bash python -m venv venv source venv/bin/activate # Linux/macOS # venv\Scripts\activate # Windows pip install -r requirements.txt
- ```6. **准备数据库**: * 确保你的 PostgreSQL 服务器正在运行。 * 创建一个数据库 (例如,你在 `.env` 中配置的 `your_financial_db`)。 * **重要**: 在该数据库中创建 `transactions`, `earnings`, 和 `products` 表。这些表的布局需要与 `transaction_service.py` 和 `earning_service.py` 中的 SQL 查询所盼望的列名和数据类型相匹配。你需要手动编写 `CREATE TABLE` 语句。 **示例 `CREATE TABLE` 语句 (你需要根据你的需求调整):** ```sql -- 产品表 CREATE TABLE products ( product_id SERIAL PRIMARY KEY, product_name VARCHAR(255) NOT NULL, product_category VARCHAR(100) ); -- 交易表 CREATE TABLE transactions ( transaction_id SERIAL PRIMARY KEY, user_id INTEGER NOT NULL, product_id INTEGER REFERENCES products(product_id), transaction_date TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP, transaction_type VARCHAR(50) NOT NULL, -- 'buy', 'sell', 'dividend_in', 'interest_in', 'fee_out' amount DECIMAL(18, 4) NOT NULL, -- 交易金额 quantity DECIMAL(18, 8), -- 交易数量 (可选, 如股票份额) price_per_unit DECIMAL(18, 4), -- 单价 (可选) currency VARCHAR(10) NOT NULL DEFAULT 'CNY', description TEXT ); CREATE INDEX idx_transactions_user_date ON transactions (user_id, transaction_date); CREATE INDEX idx_transactions_type ON transactions (transaction_type); -- 收益表 (可以是交易表的一部分,也可以是独立的表) -- 如果收益直接记载在交易表中 (例如 transaction_type = 'dividend_in'), 则大概不需要此表 -- 如果有复杂的已实现/未实现收益计算,则大概需要此表或更复杂的逻辑 CREATE TABLE earnings ( earning_id SERIAL PRIMARY KEY, user_id INTEGER NOT NULL, product_id INTEGER REFERENCES products(product_id), earning_date DATE NOT NULL, earning_type VARCHAR(50) NOT NULL, -- 'realized_gain', 'unrealized_gain', 'dividend', 'interest' amount DECIMAL(18, 4) NOT NULL, currency VARCHAR(10) NOT NULL DEFAULT 'CNY', related_transaction_id INTEGER REFERENCES transactions(transaction_id) -- (可选)关联的交易 ); CREATE INDEX idx_earnings_user_date ON earnings (user_id, earning_date); CREATE INDEX idx_earnings_type ON earnings (earning_type); -- 插入一些示例产品数据 (可选,用于测试) INSERT INTO products (product_name, product_category) VALUES ('稳健增长基金A', '基金'), ('高科技股票ETF', 'ETF'), ('定期存款产品X', '固收'); ```7. **运行应用**: ```bash flask run ``` 或者,如果 `flask run` 不工作 (大概因为 `FLASK_APP` 未被精确识别),实验: ```bash python run.py ``` 应用应该会在 `http://localhost:5000` (或 `http://0.0.0.0:5000`) 启动。**测试接口 (利用 `curl` 或 Postman):****交易分析接口:**```bashcurl -X POST -H "Content-Type: application/json" -d '{ "user_id": 1, "start_date": "2023-01-01", "end_date": "2025-12-31", "group_by": "month"}' http://localhost:5000/api/transactions/analysis
复制代码 收益分析接口:
Bash
- curl -X POST -H "Content-Type: application/json" -d '{
- "user_id": 1,
- "start_date": "2023-01-01",
- "end_date": "2025-12-31",
- "group_by": "product"
- }' http://localhost:5000/api/earnings/analysis
复制代码 请务必根据你的现实数据库表布局和数据来调整 services 层的 SQL 查询和 Pandas 分析逻辑。 这个框架提供了一个出发点,你可以根据具体需求进行扩展和修改。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |