量化研究---小果全球大类低相干性动量趋势增强轮动策略实盘设置 ...

打印 上一主题 下一主题

主题 2009|帖子 2009|积分 6027

本日完成了宽邦策略交易系统,非常不错,对接策略交易




策略设置,支持多策略同时运行
  1. {       "账户公钥":"36DMn",    "账户私钥":"ktryfMGOhEaKrOoKxo56T2AI7rTO13NlGShoZq5DHp",    "测试说明":"开启测试就是选择历史交易不开启就是选择今天的数据",    "是否开启测试":"否",    "测试数量":200,    "跟单设置":"跟单设置***********",    "账户跟单比例":0.5,    "多策略用逗号隔开":"多策略用逗号隔开********",    "组合名称":["综合因子评分选股策略"],    "组合id":["72ca4f79-4d38-4440-9c0f"],    "组合跟单比例":[1],    "不同策略间隔更新时间":0,    "下单默认说明":"默认/金额/数量",    "下单模式":"金额",    "下单值":1000}
复制代码
小果全球大类低相干性动量趋势增强轮动策略实盘,我查抄了一下代码,优化了一下代码数据的盘算
大qmt学习视频
量化研究---小果全球大类低相干性动量趋势增强轮动策略实盘设置
https://mp.weixin.qq.com/s/-dB75iFetRiYiB5L81J89g
1增补汗青汗青数据在操作的下面


可以点击回测开开回测的数据结果


开始回测


回测的结果


测试数据






实盘设置输入账户点击编辑进入




输入自定义交易金额,每一个标的交易多少


支持自定义股票池


挂模子交易实盘


回测源代码


  1. #encoding:gbk'''小果全球大类低相关性动量趋势增强轮动策略回测实盘知识星球下载作者:小果量化微信:xg_quant时间:20250416'''import pandas as pdimport numpy as npimport talibdef init(c):#账户        c.account=''#账户类型        c.account_type='STOCK'#开始时间        c.start='20241201 00:00:00'#结束时间        c.end='20500101 00:00:00'#测试资金        c.capital=300000        c.stock_list=['513100.SH', '511130.SH', '159937.SZ', '513350.SH', '512890.SH', '159915.SZ', '513500.SH', '159985.SZ', '159981.SZ','159980.SZ', '513300.SH', '159680.SZ', '511090.SH', '513400.SH', '159934.SZ']        c.name_list=['纳斯达克ETF', '30年债券ETF', '黄金ETF', '标普油气ETF','红利ETF', '创业板ETF', '标普500ETF', '豆粕ETF', '能源化工ETF','有色ETF', '沪深300ETF', '中证100ETF', '30年债券ETF', '道琼斯ETF', '黄金ETF']#时间窗口        c.windows=3#最小的周期数量        c.min_amount=9#当前的周期        c.last_amount=3#5日均线        c.mean_line=5#最低分        c.min_score=25#买入排名        c.rank=5#交易百分比,可以采用动态百分比安持股数量分配        c.buy_ratio=0.2        c.sell_ratio=0        c.period_1='60m'        c.df=pd.DataFrame()        c.df['证券代码']=c.stock_list        c.df['名称']=c.name_listprint(c.df)#动量因子天数#老版本的回测需要设定股票池,配合历史数据使用        c.set_universe(c.stock_list)def simple_volatility(prices, window=20, annualize=True):"""        计算简单历史波动率        :param prices: 价格序列        :param window: 计算窗口        :param annualize: 是否年化        :return: 波动率序列        """        returns = prices.pct_change().dropna()        vol= returns.rolling(window).std()if annualize:                vol = vol * np.sqrt(252)  # 股票市场通常用252个交易日return vol.tolist()[-1]def handlebar(c):#当前K线索引号        d=c.barpos        df=c.df#获取当前K线日期#精确到分钟小周期,不然有未来函数        today_1=timetag_to_datetime(c.get_bar_timetag(d),'%Y%m%d%H%M%S')#日线#today_1=timetag_to_datetime(c.get_bar_timetag(d),'%Y-%m-%d)print(today_1,"当前时间*****")#必须使用前复权#hist=c.get_history_data(100,'1d',['open','close','low','high'],1)#持股        hold_stock=get_position(c,c.account,c.account_type)        account=get_account(c,c.account,c.account_type)if hold_stock.shape[0]>0:                hold_stock=hold_stock[hold_stock['持仓量']>=10]if hold_stock.shape[0]>0:                        hold_stock_list=hold_stock['证券代码'].tolist()                        hold_amount=hold_stock.shape[0]else:                        hold_stock_list=[]                        hold_amount=0else:                hold_stock_list=[]                hold_amount=0        df=c.dfif df.shape[0]>0:                total_amount_list=[]                last_amount_list=[]                score_list=[]                up_line_list=[]                volatility_list=[]for stock in df['证券代码'].tolist():try:                                hist=c.get_market_data_ex(                                fields=[],                                 stock_code=[stock],                                 period=c.period_1,                                 start_time=str(c.start)[:8],                                 end_time=today_1,                                 count=-1,                                 fill_data=True,                                 subscribe=True)                                hist=hist[stock]                                volatility=simple_volatility(prices=hist['close'], window=20, annualize=True)                                df1=six_pulse_excalibur_hist(c,hist)                                df1['均线']=df1['close'].rolling(c.mean_line).mean()                                df1['站上均线']=df1['close']>df1['均线']                                score=mean_line_models(c,close_list=df1['close'].tolist())                                df1=df1[-c.windows:]                                up=df1['站上均线'].tolist()[-1]                                total_amount=sum(df1['signal'].tolist())                                last_amount=df1['signal'].tolist()[-1]                                total_amount_list.append(total_amount)                                last_amount_list.append(last_amount)                                score_list.append(score)                                up_line_list.append(up)                                volatility_list.append(volatility)except Exception as e:print(e,stock,'有问题')                                total_amount=None                                last_amount=None                                score=None                                volatility=None                                total_amount_list.append(total_amount)                                last_amount_list.append(last_amount)                                score_list.append(score)                                up_line_list.append(None)                                volatility_list.append(0)
  2.                 df['数量']=total_amount_list                df['最新数量']=last_amount_list                df['分数']=score_list                df['站上均线']=up_line_list                df['波动率']=volatility_list#选择数据                df=df[df['数量']>=c.min_amount]                df=df[df['最新数量']>=c.last_amount]                df=df[df['分数']>=c.min_score]                df=df[df['站上均线']==True]#安总数量,波动率排序print(df)                df=df.sort_values(by=['数量','分数'], ascending=False)print('分析数据***********************')print(df)                rank_stock_list=df['证券代码'][:c.rank].tolist()                buy_list=[]                sell_list=[]#买入for stock in rank_stock_list:if stock in hold_stock_list:print(stock,'买入分析在持有排名股票池继续持有')else:print(stock,'买入分析持有股票池排名没有在持股买入')                                buy_list.append(stock)#卖出for stock in hold_stock_list:if stock in rank_stock_list:print(stock,'卖出分析在持有排名股票池继续持有')else:print(stock,'卖出分析不在持有排名股票池卖出')                                sell_list.append(stock)for stock in sell_list:                        order_target_percent(stock, c.sell_ratio, c, c.account)print('{} 卖出股票{} '.format(today_1,stock))for stock in buy_list:                        order_target_percent(stock, c.buy_ratio, c, c.account)print('{} 买入股票{} '.format(today_1,stock))def mean_line_models(c,close_list=[],x1=3,x2=5,x3=10,x4=15,x5=20):'''        均线模型        趋势模型        5,10,20,30,60        '''        df=pd.DataFrame()        df['close']=close_list#df=self.bond_cov_data.get_cov_bond_hist_data(stock=stock,start=start_date,end=end_date,limit=1000000000)        df1=pd.DataFrame()        df1['x1']=df['close'].rolling(window=x1).mean()        df1['x2']=df['close'].rolling(window=x2).mean()        df1['x3']=df['close'].rolling(window=x3).mean()        df1['x4']=df['close'].rolling(window=x4).mean()        df1['x5']=df['close'].rolling(window=x5).mean()        score=0#加分的情况        mean_x1=df1['x1'].tolist()[-1]        mean_x2=df1['x2'].tolist()[-1]        mean_x3=df1['x3'].tolist()[-1]        mean_x4=df1['x4'].tolist()[-1]        mean_x5=df1['x5'].tolist()[-1]#相邻2个均线进行比较if mean_x1>=mean_x2:                score+=25if mean_x2>=mean_x3:                score+=25if mean_x3>=mean_x4:                score+=25if mean_x4>=mean_x5:                score+=25return scoredef six_pulse_excalibur_hist(c,df):
  3.         markers=0        signal=0#df=self.data.get_hist_data_em(stock=stock)        CLOSE=df['close']        LOW=df['low']        HIGH=df['high']        DIFF=EMA(CLOSE,8)-EMA(CLOSE,13)        DEA=EMA(DIFF,5)#如果满足DIFF>DEA 在1的位置标记1的图标#DRAWICON(DIFF>DEA,1,1);        markers+=IF(DIFF>DEA,1,0)#如果满足DIFF<DEA 在1的位置标记2的图标#DRAWICON(DIFF<DEA,1,2);        markers+=IF(DIFF<DEA,1,0)#DRAWTEXT(ISLASTBAR=1,1,'. MACD'),COLORFFFFFF;{微信公众号:尊重市场}        ABC1=DIFF>DEA        signal+=IF(ABC1,1,0)        尊重市场1=(CLOSE-LLV(LOW,8))/(HHV(HIGH,8)-LLV(LOW,8))*100        K=SMA(尊重市场1,3,1)        D=SMA(K,3,1)#如果满足k>d 在2的位置标记1的图标        markers+=IF(K>D,1,0)#DRAWICON(K>D,2,1);        markers+=IF(K<D,1,0)#DRAWICON(K<D,2,2);#DRAWTEXT(ISLASTBAR=1,2,'. KDJ'),COLORFFFFFF;        ABC2=K>D        signal+=IF(ABC2,1,0)        指标营地=REF(CLOSE,1)        RSI1=(SMA(MAX(CLOSE-指标营地,0),5,1))/(SMA(ABS(CLOSE-指标营地),5,1))*100        RSI2=(SMA(MAX(CLOSE-指标营地,0),13,1))/(SMA(ABS(CLOSE-指标营地),13,1))*100        markers+=IF(RSI1>RSI2,1,0)#DRAWICON(RSI1>RSI2,3,1);        markers+=IF(RSI1<RSI2,1,0)#DRAWICON(RSI1<RSI2,3,2);#DRAWTEXT(ISLASTBAR=1,3,'. RSI'),COLORFFFFFF;        ABC3=RSI1>RSI2        signal+=IF(ABC3,1,0)        尊重市场=-(HHV(HIGH,13)-CLOSE)/(HHV(HIGH,13)-LLV(LOW,13))*100        LWR1=SMA(尊重市场,3,1)        LWR2=SMA(LWR1,3,1)#DRAWICON(LWR1>LWR2,4,1);        markers+=IF(LWR1>LWR2,1,0)#DRAWICON(LWR1<LWR2,4,2);        markers+=IF(LWR1<LWR2,1,0)#DRAWTEXT(ISLASTBAR=1,4,'. LWR'),COLORFFFFFF;        ABC4=LWR1>LWR2        signal+=IF(ABC4,1,0)        BBI=(MA(CLOSE,3)+MA(CLOSE,5)+MA(CLOSE,8)+MA(CLOSE,13))/4#DRAWICON(CLOSE>BBI,5,1);        markers+=IF(CLOSE>BBI,1,0)#DRAWICON(CLOSE<BBI,5,2);        markers+=IF(CLOSE<BBI,1,0)#DRAWTEXT(ISLASTBAR=1,5,'. BBI'),COLORFFFFFF;        ABC10=7        ABC5=CLOSE>BBI        signal+=IF(ABC5,1,0)        MTM=CLOSE-REF(CLOSE,1)        MMS=100*EMA(EMA(MTM,5),3)/EMA(EMA(ABS(MTM),5),3)        MMM=100*EMA(EMA(MTM,13),8)/EMA(EMA(ABS(MTM),13),8)        markers+=IF(MMS>MMM,1,0)#DRAWICON(MMS>MMM,6,1);        markers+=IF(MMS<MMM,1,0)#DRAWICON(MMS<MMM,6,2);#DRAWTEXT(ISLASTBAR=1,6,'. ZLMM'),COLORFFFFFF;        ABC6=MMS>MMM        signal+=IF(ABC6,1,0)        df['signal']=signal        df['markers']=markersreturn dfdef get_account(c,accountid,datatype):'''        获取账户数据        '''        accounts = get_trade_detail_data(accountid, datatype, 'account')        result={}for dt in accounts:                result['总资产']=dt.m_dBalance                result['净资产']=dt.m_dAssureAsset                result['总市值']=dt.m_dInstrumentValue                result['总负债']=dt.m_dTotalDebit                result['可用金额']=dt.m_dAvailable                result['盈亏']=dt.m_dPositionProfitreturn result#获取持仓信息{code.market:手数}def get_position(c,accountid,datatype):'''        获取持股数据        '''        positions = get_trade_detail_data(accountid,datatype, 'position')        data=pd.DataFrame()if len(positions)>0:                df=pd.DataFrame()for dt in positions:                        df['股票代码']=[dt.m_strInstrumentID]                        df['市场类型']=[dt.m_strExchangeID]                        df['证券代码']=df['股票代码']+'.'+df['市场类型']                        df['证券名称']=[dt.m_strInstrumentName]                        df['持仓量']=[dt.m_nVolume]                        df['可用数量']=[dt.m_nCanUseVolume]                        df['成本价']=[dt.m_dOpenPrice]                        df['市值']=[dt.m_dInstrumentValue]                        df['持仓成本']=[dt.m_dPositionCost]                        df['盈亏']=[dt.m_dPositionProfit]                        data=pd.concat([data,df],ignore_index=True)else:                data=pd.DataFrame()return data def RD(N,D=3):   #四舍五入取3位小数 return np.round(N,D)        def RET(S,N=1):  #返回序列倒数第N个值,默认返回最后一个return np.array(S)[-N]      def ABS(S):  #返回N的绝对值    return np.abs(S)def MAX(S1,S2): #序列max return np.maximum(S1,S2)    def MIN(S1,S2):           #序列minreturn np.minimum(S1,S2)   def IF(S,A,B): #序列布尔判断 return=A  if S==True  else  B  return np.where(S,A,B)      def AND(S1,S2):#andreturn np.logical_and(S1,S2)def OR(S1,S2):#orreturn np.logical_or(S1,S2)def RANGE(A,B,C):'''        期间函数        B<=A<=C        '''        df=pd.DataFrame()        df['select']=A.tolist()        df['select']=df['select'].apply(lambda x: True if (x>=B and x<=C) else False)return df['select']
  4. def REF(S, N=1):          #对序列整体下移动N,返回序列(shift后会产生NAN)    return pd.Series(S).shift(N).values  
  5. def DIFF(S, N=1):         #前一个值减后一个值,前面会产生nan return pd.Series(S).diff(N).values     #np.diff(S)直接删除nan,会少一行
  6. def STD(S,N):             #求序列的N日标准差,返回序列    return  pd.Series(S).rolling(N).std(ddof=0).values     
  7. def SUM(S, N):            #对序列求N天累计和,返回序列    N=0对序列所有依次求和         return pd.Series(S).rolling(N).sum().values if N>0 else pd.Series(S).cumsum().values  
  8. def CONST(S):             #返回序列S最后的值组成常量序列return np.full(len(S),S[-1]) def HHV(S,N):             #HHV(C, 5) 最近5天收盘最高价        return pd.Series(S).rolling(N).max().values     
  9. def LLV(S,N):             #LLV(C, 5) 最近5天收盘最低价     return pd.Series(S).rolling(N).min().values    
  10. def HHVBARS(S,N):         #求N周期内S最高值到当前周期数, 返回序列return pd.Series(S).rolling(N).apply(lambda x: np.argmax(x[::-1]),raw=True).values 
  11. def LLVBARS(S,N):         #求N周期内S最低值到当前周期数, 返回序列return pd.Series(S).rolling(N).apply(lambda x: np.argmin(x[::-1]),raw=True).values    
  12. def MA(S,N):              #求序列的N日简单移动平均值,返回序列                    return pd.Series(S).rolling(N).mean().values  
  13. def EMA(S,N):             #指数移动平均,为了精度 S>4*N  EMA至少需要120周期     alpha=2/(span+1)    return pd.Series(S).ewm(span=N, adjust=False).mean().values     
  14. def SMA(S, N, M=1):       #中国式的SMA,至少需要120周期才精确 (雪球180周期)    alpha=1/(1+com)    return pd.Series(S).ewm(alpha=M/N,adjust=False).mean().values           #com=N-M/M
  15. def DMA(S, A):            #求S的动态移动平均,A作平滑因子,必须 0<A<1  (此为核心函数,非指标)return pd.Series(S).ewm(alpha=A, adjust=True).mean().values
  16. def WMA(S, N):            #通达信S序列的N日加权移动平均 Yn = (1*X1+2*X2+3*X3+...+n*Xn)/(1+2+3+...+Xn)return pd.Series(S).rolling(N).apply(lambda x:x[::-1].cumsum().sum()*2/N/(N+1),raw=True).values 
  17. def AVEDEV(S, N):         #平均绝对偏差  (序列与其平均值的绝对差的平均值)   return pd.Series(S).rolling(N).apply(lambda x: (np.abs(x - x.mean())).mean()).values 
  18. def SLOPE(S, N):          #返S序列N周期回线性回归斜率            return pd.Series(S).rolling(N).apply(lambda x: np.polyfit(range(N),x,deg=1)[0],raw=True).values
  19. def FORCAST(S, N):        #返回S序列N周期回线性回归后的预测值, jqz1226改进成序列出    return pd.Series(S).rolling(N).apply(lambda x:np.polyval(np.polyfit(range(N),x,deg=1),N-1),raw=True).values  
  20. def LAST(S, A, B):        #从前A日到前B日一直满足S_BOOL条件, 要求A>B & A>0 & B>=0 return np.array(pd.Series(S).rolling(A+1).apply(lambda x:np.all(x[::-1][B:]),raw=True),dtype=bool) #------------------   1级:应用层函数(通过0级核心函数实现) ----------------------------------def COUNT(S, N):                       # COUNT(CLOSE>O, N):  最近N天满足S_BOO的天数  True的天数return SUM(S,N)    
  21. def EVERY(S, N):                       # EVERY(CLOSE>O, 5)   最近N天是否都是Truereturn  IF(SUM(S,N)==N,True,False)                     def EXIST(S, N):                       # EXIST(CLOSE>3010, N=5)  n日内是否存在一天大于3000点  return IF(SUM(S,N)>0,True,False)
  22. def FILTER(S, N):                      # FILTER函数,S满足条件后,将其后N周期内的数据置为0, FILTER(C==H,5)for i in range(len(S)): S[i+1:i+1+N]=0  if S[i] else S[i+1:i+1+N]        return S                           # 例:FILTER(C==H,5) 涨停后,后5天不再发出信号 
  23. def BARSLAST(S):                       #上一次条件成立到当前的周期, BARSLAST(C/REF(C,1)>=1.1) 上一次涨停到今天的天数         M=np.concatenate(([0],np.where(S,1,0)))  for i in range(1, len(M)):  M[i]=0 if M[i] else M[i-1]+1    return M[1:]                       
  24. def BARSLASTCOUNT(S):                  # 统计连续满足S条件的周期数        by jqz1226        rt = np.zeros(len(S)+1)            # BARSLASTCOUNT(CLOSE>OPEN)表示统计连续收阳的周期数for i in range(len(S)): rt[i+1]=rt[i]+1  if S[i] else rt[i+1]return rt[1:]  
  25. def BARSSINCEN(S, N):                  # N周期内第一次S条件成立到现在的周期数,N为常量  by jqz1226return pd.Series(S).rolling(N).apply(lambda x:N-1-np.argmax(x) if np.argmax(x) or x[0] else 0,raw=True).fillna(0).values.astype(int)
  26. def CROSS(S1, S2):                     #判断向上金叉穿越 CROSS(MA(C,5),MA(C,10))  判断向下死叉穿越 CROSS(MA(C,10),MA(C,5))   return np.concatenate(([False], np.logical_not((S1>S2)[:-1]) & (S1>S2)[1:]))    # 不使用0级函数,移植方便  by jqz1226def CROSS_UP(S1, S2):                     #判断向上金叉穿越 CROSS(MA(C,5),MA(C,10))  判断向下死叉穿越 CROSS(MA(C,10),MA(C,5))   return np.concatenate(([False], np.logical_not((S1>S2)[:-1]) & (S1>S2)[1:]))    # 不使用0级函数,移植方便  by jqz1226def CROSS_DOWN(S1, S2):                     return np.concatenate(([False], np.logical_not((S1<S2)[:-1]) & (S1<S2)[1:]))    # 不使用0级函数,移植方便  by jqz1226
  27. def LONGCROSS(S1,S2,N):                #两条线维持一定周期后交叉,S1在N周期内都小于S2,本周期从S1下方向上穿过S2时返回1,否则返回0         return  np.array(np.logical_and(LAST(S1<S2,N,1),(S1>S2)),dtype=bool)            # N=1时等同于CROSS(S1, S2)
  28. def VALUEWHEN(S, X):                   #当S条件成立时,取X的当前值,否则取VALUEWHEN的上个成立时的X值   by jqz1226return pd.Series(np.where(S,X,np.nan)).ffill().values        
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

守听

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表