股票学习网

如何学炒股,入门炒股,股票入门,股票怎么玩,学习炒股网,股票技术,股票知识学习 - - 股票知识网!

炒股软件开发设计文档示例(炒股软件开发定制)

2023-08-30 19:22分类:资金仓位 阅读:

本篇文章给大家谈谈炒股软件开发设计文档示例,以及炒股软件开发定制的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

文章详情介绍:

使用Python编写一个简单的股票逃顶策略

文/公社哥

下面是一个简单的股票逃顶策略的Python代码示例:

import pandas as pd

import numpy as np

import matplotlib.pyplot as plt

# 计算股票的逃顶信号

def calculate_escape_signal(df, n=5, threshold=0.05):

df['highest_high'] = df['high'].rolling(window=n).max()

df['escape_signal'] = np.where(df['close'] < (1 - threshold) * df['highest_high'], 1, 0)

return df

# 绘制股票价格和逃顶信号的图表

def plot_escape_signal(df):

fig, ax1 = plt.subplots(figsize=(10, 6))

ax1.plot(df['date'], df['close'], 'b-')

ax1.set_xlabel('Date')

ax1.set_ylabel('Price', color='b')

ax1.tick_params('y', colors='b')

ax2 = ax1.twinx()

ax2.plot(df['date'], df['escape_signal'], 'r-')

ax2.set_ylabel('Escape Signal', color='r')

ax2.tick_params('y', colors='r')

fig.tight_layout()

plt.show()

# 读取股票数据

df = pd.read_csv('data.csv')

# 计算逃顶信号

df = calculate_escape_signal(df)

# 绘制图表

plot_escape_signal(df)

上述代码中,calculate_escape_signal函数用于计算股票的逃顶,plot_escape_signal函数用于绘制股票价格和逃顶信号的图表。在逃顶信号的计算中,我们设定了一个阈值(threshold),当股票收盘价低于最高价的(1 - 阈值)时,认为出现逃顶信号。可以根据实际需求调整参数。另外,代码中的数据来源于"data.csv"文件,请根据实际情况修改文件路径或使用其他方式获取数据。以上代码仅用于软件学习研究之用,不构成投资建议。

量化投资系列之《股票日行情数据本地化》

量化投资系列之《股票日行情数据本地化》

上篇介绍了如何搭建可编程量化环境,所以已经具备了本地数据存储的环境了,这篇将继续介绍通过 akShare 获取股票日行情数据,并存储到 clickhouse 数据库中,并且本地化查询某股票某时间段内的日行情数据。

数据本地化设计

1、流程设计

数据本地化的设计,流程应该是初次更新时,是时间 t1 到现在 t2 的时间段内所有沪深 A 股的股票数据,随时间推移,只需增量更新 t2 到最新的时间的日行情数据,所以需要一份本地配置文件记录每个股票更新的时间节点。

动态增量更新日行情数据本地化流程图如下:

2、代码设计

由于刚接触 python ,语法不是很熟悉,不懂直接看 python 语法文档的,所以现在的代码设计是面向过程的,后面熟悉 python 了再重构面向对象。

虽然不是面向对象,但是文件所负责的功能要清晰,便于后面维护和扩展,模块划分主要有:配置信息模块、网络数据获取模块、数据库管理模块、可视化模块。现在基础版是这几个模块,后续会添加策略模块、机器识别模块、机器学习模块等等扩展模块。

模块如表所示:

py文件 描述 ak_stock_data.py 通过 akShare 获取接口数据 stock_config.py 配置日行情数据增量更新的信息 stock_constant.py 放一些常量,例如:路径 stock_db.py 负责于 clickhouse 数据库进行交互,存储和本地获取 stock_plot.py 负责股票数据可视化

数据表

数据表的设计,只需要股票的必要字段,以及可以覆盖性更新。何为覆盖性更新,例如:股票 A 时间 t1 的日行情数据,多次更新数据库,不会产生多条数据,而是覆盖原来的 t1 数据。

表字段说明如下:

名称 类型 描述 date Date 交易日 code String 股票代码 open Float32 开盘价 high Float32 最高价 low Float32 最低价 close Float32 收盘价 volume Float64 成交量,注意单位: 手 amount Float64 成交额,注意单位: 元

表创建的 sql 语句如下:

CREATE TABLE stock.stock_daily_price ( `date` Date, `code` String, `open` Float32, `high` Float32, `low` Float32, `close` Float32, `volume` Float64, `amount` Float64 ) ENGINE = ReplacingMergeTree() ORDER BY (javaHash(code), date)

说明: ENGINE 引擎,采用表 ReplacingMergeTree,它会删除排序键值相同的重复项,从而达到去除重复值。 排序键值 ORDER BY (javaHash(code), date) :javaHash 函数对股票代码 code 进行 hash 值提取。ORDER BY (javaHash(code), date) 的意思是,按股票代码 code 的 hash 值和时间 date 进行排序。

按上所述创建数据表,就具有了一个可以通过股票代码和日期作为唯一排序值,去除重复插入的数值,保证了股票在某天日行情数据的唯一性。

增量更新

想设计一个更新过的数据不再更新,只更新没更新的数据,很自然要一份配置表,在日行情数据本地化之前,只需要读取某只股票更新到那里了,然后接最后更新的日期到现在的时间节点为所需更新时间段。所以我们需要一个字段:股票最后更新日期。当然,网络数据更新不可靠,会有失败情况,所以需要一个字段记录更新失败的次数。初级版大概是这样子

配置表如下:

名称 类型 描述 code String 股票代码 name String 股票名称 daily_update_time Date 最后一次更新的日期
格式:20220903 error_daily_update_count int 更新错误次数
success:0 fail:+1

说明:error_daily_update_count 是记录失败次数,如果更新很多次,还是失败的,可能这股票退市了或怎么了,可以定期清除这些无用股票。

stock_config.py

import os import pandas as pd import stock_constant as sc def create_config(): """ 配置文件参数 code:股票代码 ,name: 股票名称 last_update_time: 最后一次更新的日期 error_update_count:请求更新错误次数统计,定期清理多次更新错误的股票代码 :return: DataFrame """ columns = ['code', 'name', 'daily_update_time', 'error_daily_update_count'] df = pd.DataFrame(columns=columns) save_config(df) return df def read_config(is_new=False): """ is_new if true del config else if part update 如果是新则删除配置信息,重置,重新更新,否则增量更新 读取配置信息,并对返回的dataFrame 数据类型进行预处理 :return: """ if is_new: os.remove(sc.config_path) print('删除配置成功,重置配置...') try: df = pd.read_csv(sc.config_path) except Exception as error: print("创建配置信息:", error) df = create_config() # code 强制转str类型,并补全股票代码为6位 df['code'] = df['code'].astype(str) df['code'] = df['code'].str.zfill(6) df['error_daily_update_count'] = df['error_daily_update_count'].astype(int) return df def save_config(df): """ 保存配置信息,忽略列 :param df: :return: """ df.to_csv(sc.config_path, encoding='utf-8', index=False)

该模块,提供配置信息保存和获取 注意:获取配置时 code 强制转str类型,并补全股票代码为6位。保存配置时,忽略列的保存。

效果图:

日行情数据本地化

akShare 三个接口即可实现 获取沪深 A 股所有股票,通过接口 ak.stock_sh_a_spot_em() ,ak.stock_sz_a_spot_em(),通过这两个接口,即可获取沪深 A 股所有股票代码,然后清洗数据保存必要的数据到配置文件中 config.csv。 通过接口 ak.stock_zh_a_hist() 获取某股票某时间段的日行情数据,然后保存到数据库中,更新配置信息。

注意:需要修改网络 dataFrame 的列名,再保存到数据库中,列如:接口的列名是,['日期', '开盘', '最高', '最低', '收盘', '成交量', '成交额'],数据库的列名为:['date', 'open', 'high', 'low', 'close', 'volume', 'amount'],所以需要修改。

插入数据库是,需要重设 dataFrame index 为数据库列名的 date ,df.set_index(['date'], inplace=True)。

话不多说,直接打码

ak_stock_data.py

import time import akshare as ak import stock_config as scg import stock_constant as sct import stock_db as sdb def stock_code_net_to_csv(): """ 获取A股股票代码,并保存到data/xxx.csv文件中 新浪日行情数据需要:沪交所股票代码,代码添加前缀sh,深交所股票代码,代码添加前缀sz :return: [上交所DataFrame,深交所DataFrame] """ stock_sh_a_spot_em_df = ak.stock_sh_a_spot_em() # 修改股票代码前缀 stock_sh_a_spot_em_df['代码'] = \ stock_sh_a_spot_em_df['代码'].apply(lambda _: str(_)) # stock_sh_a_spot_em_df['代码'].apply(lambda x: "{}{}".format('sh', x)) # 保存 stock_sh_a_spot_em_df.to_csv(sct.sh_code_path) stock_sz_a_spot_em_df = ak.stock_sz_a_spot_em() # 修改股票代码前缀 stock_sz_a_spot_em_df['代码'] = \ stock_sz_a_spot_em_df['代码'].apply(lambda _: str(_)) # stock_sz_a_spot_em_df['代码'].apply(lambda x: "{}{}".format('sz', x)) # 保存 stock_sz_a_spot_em_df.to_csv(sct.sz_code_path) return stock_sh_a_spot_em_df, stock_sz_a_spot_em_df def start(): """ 量化投资程序入口 :return: """ config_df = scg.read_config() # 如果配置是空,则获取沪深A股信息,获取code,保存到配置 if config_df.empty: sh_df, sz_df = stock_code_net_to_csv() # 补全本地配置信息 for index, row in sh_df.iterrows(): config_df.loc[len(config_df), config_df.columns] = (row['代码'], row['名称'], sct.start_date, 0) for index, row in sz_df.iterrows(): config_df.loc[len(config_df), config_df.columns] = (row['代码'], row['名称'], sct.start_date, 0) # 保存到本地 scg.save_config(config_df) print('初始化配置信息,并保存到本地成功...') else: print('已经初始化过本地配置...') # 开始更新日行情数据 update_stock_zh_a_daily_eastmoney() def update_stock_zh_a_daily_eastmoney(): """ 东方财富日行情数据,沪深A股,先从本地配置获取股票代码,再获取日行情数据 获取成功或失败,记录到本地数据,以便股票数据更新完整 :return: """ success_code_list = [] except_code_list = [] # 读取配置信息 config_df = scg.read_config() if config_df.empty: print('配置信息错误,请检查...') return for index, row in config_df.iterrows(): code = row['code'] start_time = row['daily_update_time'] end_time = time.strftime('%Y%m%d', time.localtime()) try: except_code = str(code) df = ak.stock_zh_a_hist( symbol=str(code), start_date=start_time, end_date=end_time, adjust="qfq") except: except_code_list.append(except_code) # 更新配置信息config_df config_df.loc[config_df['code'] == code, 'error_daily_update_count'] \ = row['error_daily_update_count'] + 1 print("发生异常code ", except_code) continue print('成功获取股票: index->{} {}日行情数据'.format(index, code), ' 开始时间: {} 结束时间: {}'.format(start_time, end_time)) if df.empty: continue # 获取对应的子列集 sub_df = df[['日期', '开盘', '最高', '最低', '收盘', '成交量', '成交额']] # net_df 的列名可能和数据库列名不一样,修改列名对应数据库的列名 sub_df.columns = ['date', 'open', 'high', 'low', 'close', 'volume', 'amount'] # 修改 index 为 date 去掉默认的 index 便于直接插入数据库 sub_df.set_index(['date'], inplace=True) sub_df.insert(sub_df.shape[1], 'code', str(code)) sdb.to_table(sub_df, "stock_daily_price") # 更新配置信息config_df config_df.loc[config_df['code'] == code, 'daily_update_time'] = end_time config_df.loc[config_df['code'] == code, 'error_daily_update_count'] = 0 # 间隔更新到本地配置 if index % 100 == 0: scg.save_config(config_df) print('index: {} 更新本地配置一次...'.format(index)) success_code_list.append(code) print(sub_df) # 同步配置到本地 scg.save_config(config_df) print('更新本地配置成功...') print("成功请求的code: ", success_code_list) print("错误请求code: ", except_code_list) if __name__ == '__main__': start() pass

数据库效果图:

日志效果图: 从最后打印出来的日志可以看出,4千多个股票,绝大部分数据请求是成功的,并保存到数据库中,也更新到本地股票配置信息中 config.csv,方便下次增量下载日行情数据。

因为有了增量更新功能,再跑的时候效果是这样子的

本地数据查询

至此,网络数据本地化已经完成,俗话说:巧妇难为无米之炊,现在米已经在本地了,可以磨刀霍霍搞事情了。 所以封装一下从本地获取数据的接口:stock_daily(code, start_time, end_time)。只需要传一个股票代码,开始时间,结束时间,就可以从本地获取某股票某时间段的股票日行情数据。

数据库封装代码:

stock_db.py

import pandahouse as ph import time ''' 数据的存储和获取 CREATE TABLE stock.stock_daily_price ( `date` Date, `code` String, `open` Float32, `high` Float32, `low` Float32, `close` Float32, `volume` Float64, `amount` Float64 -- `adj_factor` Int32, -- `st_status` Int16, -- `trade_status` Int16 ) ENGINE = ReplacingMergeTree() ORDER BY (javaHash(code), date) ''' connection = dict(database="stock", host="http://localhost:8123", user='default', password='sykent') def to_table(data, table): """ 插入数据到表 :param data: :param table: :return: """ affected_rows = ph.to_clickhouse(data, table=table, connection=connection) return affected_rows def from_table(sql): """ 查询表 :param sql: :return: dataframe """ last_time = time.time() df = ph.read_clickhouse(sql, connection=connection) print("db-> 耗时: {} sql: {}".format((time.time() - last_time) * 1000, sql)) return df def stock_daily(code, start_time, end_time): """ 获取某股票,某时间段的日行情数据 select * from stock_daily_price where code == '000001' and date between '2022-03-30' and '2022-07-29' :param code: :param start_time: :param end_time: :return: """ sql = "select * from stock.stock_daily_price where code == '{}' and date between '{}' and '{}'" \ .format(code, start_time, end_time) return from_table(sql) def all_stock_daily(start_time, end_time): """ 获取所有股票某时间段的日行情数据 select * from stock.stock_daily_price where date between '2022-03-30' and '2022-07-29' :param start_time: :param end_time: :return: """ sql = "select * from stock.stock_daily_price where date between '{}' and '{}'" \ .format(start_time, end_time) return from_table(sql)

效果图所示,获取中国平安本地化 2022 年的全部日行情数据,只需要 35ms ,是不是很快。

下篇预告

获取本地日行情数据,k 线可视化,已经封装完成,只需要一句代码即可画出漂亮 k 线图。

效果图:

完稿于 2022 年 09 月 04 日 14:26:10

https://www.suoduoma.com

上一篇:炒股超短线技巧(超短线的几种绝招)

下一篇:手机炒股(金太阳手机炒股软件)

相关推荐

返回顶部