# -*- coding: utf-8 -*-
"""
1. 程序目的
(1) 调用climate indices库计算不同时间尺度的spei
2. 2023年12月15日 Version 1
3. 数据
根路径:'E:\05_Study\05_DroughtIndex\02_SPEI\'
3.1 输入数据
'\01_Data\'路径下的气象数据
3.2 输出数据
'\03_Result\'路径下的数据
4.
"""
# %% 包的导入
# 包的导入
import os
import numpy as np
import pandas as pd
from climate_indices import indices
from climate_indices import compute # 计算SPEI的包
# %% 获取某一路径下特定格式的文件
def get_file(
inpath: str,
type_str: str
) -> list:
"""
(1) 功能:
1) 获取某一路径下特定格式的文件
----------
(2) 输入参数
1) inpath: str
文件存储绝对路径
2) type_str: str
文件后缀名
----------
(3) 输出参数
1) file_list: list
特定文件格式的数据
"""
file_items = os.listdir(inpath)
file_list = []
file_path_list = []
for ii in file_items:
if ii.endswith(type_str):
file_list.append(ii)
file_path_list.append(inpath+ii)
return file_list,file_path_list
# %% 数据读取函数定义
def read_oridata(
filepath: str
):
"""
(1) 功能:
读取用于计算SPEI的气象数据
注:此函数读取的是测试数据,也可以读取按照相同方式存储的气象数据
---
(2) 参数:
1) filepath: str,输入数存储路径
---
(3) 返回值
1) sta_id: int,站点号
2) styr: int, 开始年
3) edyr: int, 结束年
4) lat: float, 站点纬度
5)lon: float, 站点经度
6) alt: float, 站点海拔
7) tas_mean: np.array, 平均温
8) pre: np.array, 降水
"""
# 数据读取
climdata = pd.read_csv(filepath)
# 站点号
sta_id = climdata.Sta_ID[0]
# 开始年和结束年
styr = climdata.Year[0]
edyr = max(climdata.Year)
# 纬度
lat = climdata.Lat[0]
# 经度
lon = climdata.Lon[0]
# 海拔
alt = climdata.Alt[0]
# 平均温
tas_mean = np.asarray(climdata.T_Mean)
# 降水
pre = np.asarray(climdata.Pre_2020)
return sta_id,styr,edyr,lat,lon,alt,tas_mean,pre
# %% 不同时间尺度spei计算函数定义
def dif_scale_spei(
sta_id: int,
lat: float,
lon: float,
alt: float,
tas_data: np.array,
pre_data: np.array,
styr: int,
edyr: int,
scale: tuple
) -> pd.DataFrame:
"""
(1) 功能:
1) 计算单一站点不同时间尺度的spei
---
(2) 参数
1) sta_id: int, 站点号
2) lat: float, 站点纬度
3) lon: float, 站点经度
4) alt: float, 站点海拔
5) tas_data: np.array, 平均温序列,月值
6) pre_data: np.array, 降水序列,月值
7) styr: int, 开始年
8) edyr: int, 结束年
9) scale: tuple, spei的时间尺度
---
(3) 返回值
1) spei_df: pd.DataFrame, 不同时间尺度的SPEI
列名:站点号,纬度,经度,海拔,年,月,不同时间尺度的SPEI
注:SPEI计算结果中的-99表示无效值
"""
# 创建存储计算结果的Dataframe(站点号,年,月,不同时间尺度的SPEI)
# 创建列名
scale = sorted(scale)
scale_list = []
for scale_temp in scale:
if scale_temp < 10:
scale_list.append('Scale_0'+str(scale_temp))
else:
scale_list.append('Scale_'+str(scale_temp))
col_names = ['Sta_ID','Lat','Lon','Alt','Year','Month'] + scale_list
# 空的数据框的创建
spei_df = pd.DataFrame(data=[],columns=col_names)
# 潜在蒸散发计算-桑斯维特方法
pet_data = indices.pet(temperature_celsius=tas_data,
latitude_degrees=lat,
data_start_year=styr)
# 不同时间尺度的spei的计算
for scale_temp in scale:
spei = indices.spei(precips_mm=pre_data,
pet_mm=pet_data,
scale=scale_temp, # 时间尺度
distribution=indices.Distribution.gamma,
periodicity=compute.Periodicity.monthly,
data_start_year=styr,
calibration_year_initial=styr,
calibration_year_final=edyr,
)
spei[np.isnan(spei)] = -99 # nan转换为-99
if scale_temp < 10:
spei_df['Scale_0'+str(scale_temp)]=spei
else:
spei_df['Scale_'+str(scale_temp)]=spei
# 站点信息补充
spei_df['Sta_ID'] = sta_id
# 生成年月序列
year_list = []
month_list = []
for year in range(styr,edyr+1):
for month in range(1,13):
year_list.append(year)
month_list.append(month)
spei_df['Year'] = year_list
spei_df['Month'] = month_list
spei_df['Lat'] = lat
spei_df['Lon'] = lon
spei_df['Alt'] = alt
return spei_df
# %%
if __name__ == '__main__':
# %% 路径处理和变量预定义
rootdir = r'E:\05_Study\05_DroughtIndex\02_SPEI'
inpath = rootdir + '\\01_Data\\'
outpath = rootdir + '\\03_Result\\'
type_str = '.csv'
scale_aim = (1,2,3,4,5,6,7,8,9,10,11,12,24)
# %% 调用get_file()函数,获取所有文件信息
file_list,file_list_path = get_file(inpath,type_str)
# %% 逐站点计算SPEI
ii = 0
for file_path_temp in file_list_path:
# %% 调用read_oridata()函数,获取特定站点数据
sta_id,styr,edyr,lat,lon,alt,tas_mean,pre = read_oridata(file_path_temp)
# %% 调用dif_scale_spei()函数,计算不同时间尺度的spei
spei_dif_scale = dif_scale_spei(sta_id,lat,lon,alt,tas_mean,pre,styr,edyr,scale=scale_aim)
#outfile = str(int(sta_id)) + '_SPEI.xlsx'
outfile = file_list[ii].replace('MonthData.csv','SPEI_V2.xlsx')
outfile_path = outpath + outfile
spei_dif_scale.to_excel(outfile_path,index=False,sheet_name=str(int(sta_id)))
print(str(sta_id)+' has been finished.')
ii = ii + 1
print('Finished.')