# Module: others # 获取当前日期字符串 def get_date(bar=True): import datetime datetime_date = str(datetime.date.today()) if bar==False: datetime_date = datetime_date.replace('-', '') return datetime_date # 获取当前时间字符串 def get_time(colon=True): import datetime datetime_time = datetime.datetime.now().strftime('%H:%M:%S') if colon==False: datetime_time = datetime_time.replace(':', '') return datetime_time # 自动先后运行程序 def run_programs_sequentially(program_files=['./a.py', './b.py'], execute='python ', show_time=0): import os import time if show_time == 1: start = time.time() i0 = 0 for program_file in program_files: i0 += 1 if show_time == 1: start_0 = time.time() os.system(execute+program_file) if show_time == 1: end_0 = time.time() print('Running time of program_'+str(i0)+' = '+str((end_0-start_0)/60)+' min') if show_time == 1: end = time.time() print('Total running time = '+str((end-start)/60)+' min') # 获取CPU使用率 def get_cpu_usage(interval=1): import psutil cpu_usage = psutil.cpu_percent(interval=interval) return cpu_usage # 获取内存信息 def get_memory_info(): import psutil memory_info = psutil.virtual_memory() total_memory = memory_info.total/(1024**2) used_memory = memory_info.used/(1024**2) available_memory = memory_info.available/(1024**2) used_memory_percent = memory_info.percent return total_memory, used_memory, available_memory, used_memory_percent # 将WordPress导出的XML格式文件转换成多个MarkDown格式的文件 def convert_wordpress_xml_to_markdown(xml_file='./a.xml', convert_content=1, replace_more=[]): import xml.etree.ElementTree as ET import re tree = ET.parse(xml_file) root = tree.getroot() for item in root.findall('.//item'): title = item.find('title').text content = item.find('.//content:encoded', namespaces={'content': 'http://purl.org/rss/1.0/modules/content/'}).text if convert_content == 1: content = re.sub(r'', '', content) content = content.replace('

', '') content = content.replace('

', '') content = content.replace('
    ', '') content = content.replace('
', '') content = content.replace('', '') content = content.replace('', '') content = content.replace('', '') content = content.replace('', '') content = content.replace('
  • ', '+ ') content = content.replace('', '') content = re.sub(r'', '## ', content) content = re.sub(r'', '### ', content) content = re.sub(r'', '#### ', content) for replace_item in replace_more: content = content.replace(replace_item, '') for _ in range(100): content = content.replace('\n\n\n', '\n\n') else: pass markdown_content = f"# {title}\n{content}" markdown_file_path = f"{title}.md" cleaned_filename = re.sub(r'[/:*?"<>|\'\\]', ' ', markdown_file_path) with open(cleaned_filename, 'w', encoding='utf-8') as md_file: md_file.write(markdown_content) # 获取运行的日期和时间并写入文件 def statistics_with_day_and_time(content='', filename='a', file_format='.txt'): import datetime datetime_today = str(datetime.date.today()) datetime_time = datetime.datetime.now().strftime('%H:%M:%S') with open(filename+file_format, 'a', encoding="utf-8") as f2: if content == '': f2.write(datetime_today+' '+datetime_time+'\n') else: f2.write(datetime_today+' '+datetime_time+' '+content+'\n') # 统计Python文件中import的数量并排序 def count_number_of_import_statements(filename, file_format='.py', num=1000): with open(filename+file_format, 'r') as file: lines = file.readlines() import_array = [] for line in lines: if 'import ' in line: line = line.strip() import_array.append(line) from collections import Counter import_statement_counter = Counter(import_array).most_common(num) return import_statement_counter # 获取本月的所有日期 def get_days_of_the_current_month(str_or_datetime='str'): import datetime today = datetime.date.today() first_day_of_month = today.replace(day=1) if first_day_of_month.month == 12: next_month = first_day_of_month.replace(year=first_day_of_month.year + 1, month=1) else: next_month = first_day_of_month.replace(month=first_day_of_month.month + 1) current_date = first_day_of_month day_array = [] while current_date < next_month: if str_or_datetime=='str': day_array.append(str(current_date)) elif str_or_datetime=='datetime': day_array.append(current_date) current_date += datetime.timedelta(days=1) return day_array # 获取上个月份 def get_last_month(): import datetime today = datetime.date.today() last_month = today.month - 1 if last_month == 0: last_month = 12 year_of_last_month = today.year - 1 else: year_of_last_month = today.year return year_of_last_month, last_month # 获取上上个月份 def get_the_month_before_last(): import datetime today = datetime.date.today() the_month_before_last = today.month - 2 if the_month_before_last == 0: the_month_before_last = 12 year_of_the_month_before_last = today.year - 1 else: year_of_the_month_before_last = today.year if the_month_before_last == -1: the_month_before_last = 11 year_of_the_month_before_last = today.year - 1 else: year_of_the_month_before_last = today.year return year_of_the_month_before_last, the_month_before_last # 获取上个月的所有日期 def get_days_of_the_last_month(str_or_datetime='str'): import datetime import guan today = datetime.date.today() year_of_last_month, last_month = guan.get_last_month() first_day_of_month = today.replace(year=year_of_last_month, month=last_month, day=1) if first_day_of_month.month == 12: next_month = first_day_of_month.replace(year=first_day_of_month.year + 1, month=1) else: next_month = first_day_of_month.replace(month=first_day_of_month.month + 1) current_date = first_day_of_month day_array = [] while current_date < next_month: if str_or_datetime=='str': day_array.append(str(current_date)) elif str_or_datetime=='datetime': day_array.append(current_date) current_date += datetime.timedelta(days=1) return day_array # 获取上上个月的所有日期 def get_days_of_the_month_before_last(str_or_datetime='str'): import datetime import guan today = datetime.date.today() year_of_last_last_month, last_last_month = guan.get_the_month_before_last() first_day_of_month = today.replace(year=year_of_last_last_month, month=last_last_month, day=1) if first_day_of_month.month == 12: next_month = first_day_of_month.replace(year=first_day_of_month.year + 1, month=1) else: next_month = first_day_of_month.replace(month=first_day_of_month.month + 1) current_date = first_day_of_month day_array = [] while current_date < next_month: if str_or_datetime=='str': day_array.append(str(current_date)) elif str_or_datetime=='datetime': day_array.append(current_date) current_date += datetime.timedelta(days=1) return day_array # 获取所有股票 def all_stocks(): import numpy as np import akshare as ak stocks = ak.stock_zh_a_spot_em() title = np.array(stocks.columns) stock_data = stocks.values return title, stock_data # 获取所有股票的代码 def all_stock_symbols(): import guan title, stock_data = guan.all_stocks() stock_symbols = stock_data[:, 1] return stock_symbols # 股票代码的分类 def stock_symbols_classification(): import guan import re stock_symbols = guan.all_stock_symbols() # 上交所主板 stock_symbols_60 = [] for stock_symbol in stock_symbols: find_600 = re.findall(r'^600', stock_symbol) find_601 = re.findall(r'^601', stock_symbol) find_603 = re.findall(r'^603', stock_symbol) find_605 = re.findall(r'^605', stock_symbol) if find_600 != [] or find_601 != [] or find_603 != [] or find_605 != []: stock_symbols_60.append(stock_symbol) # 深交所主板 stock_symbols_00 = [] for stock_symbol in stock_symbols: find_000 = re.findall(r'^000', stock_symbol) find_001 = re.findall(r'^001', stock_symbol) find_002 = re.findall(r'^002', stock_symbol) find_003 = re.findall(r'^003', stock_symbol) if find_000 != [] or find_001 != [] or find_002 != [] or find_003 != []: stock_symbols_00.append(stock_symbol) # 创业板 stock_symbols_30 = [] for stock_symbol in stock_symbols: find_300 = re.findall(r'^300', stock_symbol) find_301 = re.findall(r'^301', stock_symbol) if find_300 != [] or find_301 != []: stock_symbols_30.append(stock_symbol) # 科创板 stock_symbols_68 = [] for stock_symbol in stock_symbols: find_688 = re.findall(r'^688', stock_symbol) find_689 = re.findall(r'^689', stock_symbol) if find_688 != [] or find_689 != []: stock_symbols_68.append(stock_symbol) # 新三板 stock_symbols_8_4 = [] for stock_symbol in stock_symbols: find_82 = re.findall(r'^82', stock_symbol) find_83 = re.findall(r'^83', stock_symbol) find_87 = re.findall(r'^87', stock_symbol) find_88 = re.findall(r'^88', stock_symbol) find_430 = re.findall(r'^430', stock_symbol) find_420 = re.findall(r'^420', stock_symbol) find_400 = re.findall(r'^400', stock_symbol) if find_82 != [] or find_83 != [] or find_87 != [] or find_88 != [] or find_430 != [] or find_420 != [] or find_400 != []: stock_symbols_8_4.append(stock_symbol) # 检查遗漏的股票代码 stock_symbols_others = [] for stock_symbol in stock_symbols: if stock_symbol not in stock_symbols_60 and stock_symbol not in stock_symbols_00 and stock_symbol not in stock_symbols_30 and stock_symbol not in stock_symbols_68 and stock_symbol not in stock_symbols_8_4: stock_symbols_others.others.append(stock_symbol) return stock_symbols_60, stock_symbols_00, stock_symbols_30, stock_symbols_68, stock_symbols_8_4, stock_symbols_others # 股票代码各个分类的数量 def statistics_of_stock_symbols_classification(): import guan stock_symbols_60, stock_symbols_00, stock_symbols_30, stock_symbols_68, stock_symbols_8_4, stock_symbols_others = guan.stock_symbols_classification() num_stocks_60 = len(stock_symbols_60) num_stocks_00 = len(stock_symbols_00) num_stocks_30 = len(stock_symbols_30) num_stocks_68 = len(stock_symbols_68) num_stocks_8_4 = len(stock_symbols_8_4) num_stocks_others= len(stock_symbols_others) return num_stocks_60, num_stocks_00, num_stocks_30, num_stocks_68, num_stocks_8_4, num_stocks_others # 从股票代码获取股票名称 def find_stock_name_from_symbol(symbol='000002'): import guan title, stock_data = guan.all_stocks() for stock in stock_data: if symbol in stock: stock_name = stock[2] return stock_name # 市值排序 def sorted_market_capitalization(num=10): import numpy as np import guan title, stock_data = guan.all_stocks() new_stock_data = [] for stock in stock_data: if np.isnan(float(stock[9])): continue else: new_stock_data.append(stock) new_stock_data = np.array(new_stock_data) list_index = np.argsort(new_stock_data[:, 17]) list_index = list_index[::-1] if num == None: num = len(list_index) sorted_array = [] for i0 in range(num): stock_symbol = new_stock_data[list_index[i0], 1] stock_name = new_stock_data[list_index[i0], 2] market_capitalization = new_stock_data[list_index[i0], 17]/1e8 sorted_array.append([i0+1, stock_symbol, stock_name, market_capitalization]) return sorted_array # 美股市值排序 def sorted_market_capitalization_us(num=10): import akshare as ak import numpy as np stocks = ak.stock_us_spot_em() stock_data = stocks.values new_stock_data = [] for stock in stock_data: if np.isnan(float(stock[9])): continue else: new_stock_data.append(stock) new_stock_data = np.array(new_stock_data) list_index = np.argsort(new_stock_data[:, 9]) list_index = list_index[::-1] if num == None: num = len(list_index) sorted_array = [] for i0 in range(num): stock_symbol = new_stock_data[list_index[i0], 15] stock_name = new_stock_data[list_index[i0], 1] market_capitalization = new_stock_data[list_index[i0], 9]/1e8 sorted_array.append([i0+1, stock_symbol, stock_name, market_capitalization]) return sorted_array # 获取单个股票的历史数据 def history_data_of_one_stock(symbol='000002', period='daily', start_date="19000101", end_date='21000101'): # period = 'daily' # period = 'weekly' # period = 'monthly' import numpy as np import akshare as ak stock = ak.stock_zh_a_hist(symbol=symbol, period=period, start_date=start_date, end_date=end_date) title = np.array(stock.columns) stock_data = stock.values[::-1] return title, stock_data # 绘制股票图 def plot_stock_line(date_array, opening_array, closing_array, high_array, low_array, lw_open_close=6, lw_high_low=2, xlabel='date', ylabel='price', title='', fontsize=20, labelsize=20, adjust_bottom=0.2, adjust_left=0.2): import guan plt, fig, ax = guan.import_plt_and_start_fig_ax(adjust_bottom=adjust_bottom, adjust_left=adjust_left, labelsize=labelsize) ax.set_title(title, fontsize=fontsize, fontfamily='Times New Roman') ax.set_xlabel(xlabel, fontsize=fontsize, fontfamily='Times New Roman') ax.set_ylabel(ylabel, fontsize=fontsize, fontfamily='Times New Roman') for i0 in range(len(date_array)): if opening_array[i0] <= closing_array[i0]: ax.vlines(date_array[i0], opening_array[i0], closing_array[i0], linestyle='-', color='red', lw=lw_open_close) ax.vlines(date_array[i0], low_array[i0], high_array[i0], color='red', linestyle='-', lw=lw_high_low) else: ax.vlines(date_array[i0], opening_array[i0], closing_array[i0], linestyle='-', color='green', lw=lw_open_close) ax.vlines(date_array[i0], low_array[i0], high_array[i0], color='green', linestyle='-', lw=lw_high_low) plt.show() plt.close('all') # 获取软件包中的所有模块名 def get_all_modules_in_one_package(package_name='guan'): import pkgutil package = __import__(package_name) module_names = [name for _, name, _ in pkgutil.iter_modules(package.__path__)] return module_names # 获取软件包中一个模块的所有函数名 def get_all_functions_in_one_module(module_name, package_name='guan'): import inspect function_names = [] module = __import__(f"{package_name}.{module_name}", fromlist=[""]) for name, obj in inspect.getmembers(module): if inspect.isfunction(obj): function_names.append(name) return function_names # 获取软件包中的所有函数名 def get_all_functions_in_one_package(package_name='guan', print_show=1): import guan module_names = guan.get_all_modules_in_one_package(package_name=package_name) all_function_names = [] for module_name in module_names: function_names = guan.get_all_functions_in_one_module(module_name, package_name='guan') if print_show == 1: print('Module:', module_name) for name in function_names: all_function_names.append(name) if print_show == 1: print('function:', name) if print_show == 1: print() return all_function_names # 获取包含某个字符的进程PID值 def get_PID(name): import subprocess command = "ps -ef | grep "+name result = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if result.returncode == 0: ps_ef = result.stdout import re ps_ef = re.split(r'\s+', ps_ef) id_running = ps_ef[1] return id_running # 获取函数的源码 def get_function_source(function_name): import inspect function_source = inspect.getsource(function_name) return function_source # 查找文件名相同的文件 def find_repeated_file_with_same_filename(directory='./', ignored_directory_with_words=[], ignored_file_with_words=[], num=1000): import os from collections import Counter file_list = [] for root, dirs, files in os.walk(directory): for i0 in range(len(files)): file_list.append(files[i0]) for word in ignored_directory_with_words: if word in root: file_list.remove(files[i0]) for word in ignored_file_with_words: if word in files[i0]: try: file_list.remove(files[i0]) except: pass count_file = Counter(file_list).most_common(num) repeated_file = [] for item in count_file: if item[1]>1: repeated_file.append(item) return repeated_file # 统计各个子文件夹中的文件数量 def count_file_in_sub_directory(directory='./', sort=0, reverse=1, print_show=1, smaller_than_num=None): import os import numpy as np dirs_list = [] for root, dirs, files in os.walk(directory): if dirs != []: for i0 in range(len(dirs)): dirs_list.append(root+'/'+dirs[i0]) count_file_array = [] for sub_dir in dirs_list: file_list = [] for root, dirs, files in os.walk(sub_dir): for i0 in range(len(files)): file_list.append(files[i0]) count_file = len(file_list) count_file_array.append(count_file) if sort == 0: if print_show == 1: if smaller_than_num == None: print(sub_dir) print(count_file) print() else: if count_file