From 463fc041f1fed39d5304ca0a7cc17752f0b5cf18 Mon Sep 17 00:00:00 2001 From: guanjihuan Date: Sun, 23 Mar 2025 02:19:27 +0800 Subject: [PATCH] 0.1.169 --- PyPI/src/guan.egg-info/PKG-INFO | 2 +- PyPI/src/guan.egg-info/SOURCES.txt | 1 + PyPI/src/guan/__init__.py | 1 + PyPI/src/guan/data_processing.py | 1372 +-------------------- PyPI/src/guan/file_reading_and_writing.py | 164 +++ PyPI/src/guan/others.py | 1187 ++++++++++++++++++ README.md | 3 +- 7 files changed, 1367 insertions(+), 1363 deletions(-) create mode 100644 PyPI/src/guan/others.py diff --git a/PyPI/src/guan.egg-info/PKG-INFO b/PyPI/src/guan.egg-info/PKG-INFO index c6d15f9..cbd7ac3 100644 --- a/PyPI/src/guan.egg-info/PKG-INFO +++ b/PyPI/src/guan.egg-info/PKG-INFO @@ -1,6 +1,6 @@ Metadata-Version: 2.4 Name: guan -Version: 0.1.168 +Version: 0.1.169 Summary: An open source python package Home-page: https://py.guanjihuan.com Author: guanjihuan diff --git a/PyPI/src/guan.egg-info/SOURCES.txt b/PyPI/src/guan.egg-info/SOURCES.txt index 4c42c8f..b8fe1e6 100644 --- a/PyPI/src/guan.egg-info/SOURCES.txt +++ b/PyPI/src/guan.egg-info/SOURCES.txt @@ -17,6 +17,7 @@ src/guan/figure_plotting.py src/guan/file_reading_and_writing.py src/guan/functions_using_objects_of_custom_classes.py src/guan/machine_learning.py +src/guan/others.py src/guan/quantum_transport.py src/guan/topological_invariant.py src/guan.egg-info/PKG-INFO diff --git a/PyPI/src/guan/__init__.py b/PyPI/src/guan/__init__.py index 7d83bde..0c6d5a8 100644 --- a/PyPI/src/guan/__init__.py +++ b/PyPI/src/guan/__init__.py @@ -13,6 +13,7 @@ from .file_reading_and_writing import * from .figure_plotting import * from .data_processing import * from .decorators import * +from .others import * from .custom_classes import * from .functions_using_objects_of_custom_classes import * from .deprecated import * diff --git a/PyPI/src/guan/data_processing.py b/PyPI/src/guan/data_processing.py index e9acf29..0d521fc 100644 --- a/PyPI/src/guan/data_processing.py +++ b/PyPI/src/guan/data_processing.py @@ -1,90 +1,15 @@ # Module: data_processing -# AI 对话 -def chat(prompt='你好', model=1, stream=1, stream_label=0): - import requests - url = "http://api.guanjihuan.com/chat" - data = { - "prompt": prompt, - "model": model, - } - if stream == 1: - if stream_label == 1: - print('\n--- Start Chat Stream Message ---\n') - requests_response = requests.post(url, json=data, stream=True) - response = '' - if requests_response.status_code == 200: - for line in requests_response.iter_lines(): - if line: - if stream == 1: - print(line.decode('utf-8'), end='', flush=True) - response += line.decode('utf-8') - print() - else: - pass - if stream == 1: - if stream_label == 1: - print('\n--- End Chat Stream Message ---\n') - return response - -# 加上函数代码的 AI 对话 -def chat_with_function_code(function_name, prompt='', model=1, stream=1): - import guan - function_source = guan.get_source(function_name) - if prompt == '': - response = guan.chat(prompt=function_source, model=model, stream=stream) - else: - response = guan.chat(prompt=function_source+'\n\n'+prompt, model=model, stream=stream) - return response - -# 机器人自动对话 -def auto_chat(prompt='你好', round=2, model=1, stream=1): - import guan - response0 = prompt - for i0 in range(round): - print(f'\n【对话第 {i0+1} 轮】\n') - print('机器人 1: ') - response1 = guan.chat(prompt=response0, model=model, stream=stream) - print('机器人 2: ') - response0 = guan.chat(prompt=response1, model=model, stream=stream) - -# 机器人自动对话(引导对话) -def auto_chat_with_guide(prompt='你好', guide_message='(回答字数少于30个字,最后反问我一个问题)', round=5, model=1, stream=1): - import guan - response0 = prompt - for i0 in range(round): - print(f'\n【对话第 {i0+1} 轮】\n') - print('机器人 1: ') - response1 = guan.chat(prompt=response0+guide_message, model=model, stream=stream) - print('机器人 2: ') - response0 = guan.chat(prompt=response1+guide_message, model=model, stream=stream) - -# 在云端服务器上运行函数(需要函数是独立可运行的代码) -def run(function_name, *args, **kwargs): - import requests - import guan - url = "http://run.guanjihuan.com/run_function" - function_source = guan.get_source(function_name) - data = { - "function_name": function_name.__name__, - "function_source": function_source, - 'args': str(args), - 'kwargs': str(kwargs), - } - return_data = None - try: - response = requests.post(url, json=data) - if response.status_code == 200: - result = response.json() - print_data = result['print_data'] - print(print_data, end='') - encoded_return_data = result['encoded_return_data'] - import base64 - import pickle - return_data = pickle.loads(base64.b64decode(encoded_return_data)) - except: - pass - return return_data +# 获取运行的日期和时间并写入文件 +def statistics_with_day_and_time(content='', filename='time_logging', file_format='.txt'): + import datetime + datetime_today = str(datetime.date.today()) + datetime_time = datetime.datetime.now().strftime('%H:%M:%S') + with open(filename+file_format, 'a', encoding="utf-8") as f2: + if content == '': + f2.write(datetime_today+' '+datetime_time+'\n') + else: + f2.write(datetime_today+' '+datetime_time+' '+content+'\n') # 使用该函数获取函数计算时间(秒) def timer(function_name, *args, **kwargs): @@ -102,181 +27,6 @@ def try_except(function_name, *args, **kwargs): except: pass -# 获取运行的日期和时间并写入文件 -def statistics_with_day_and_time(content='', filename='time_logging', file_format='.txt'): - import datetime - datetime_today = str(datetime.date.today()) - datetime_time = datetime.datetime.now().strftime('%H:%M:%S') - with open(filename+file_format, 'a', encoding="utf-8") as f2: - if content == '': - f2.write(datetime_today+' '+datetime_time+'\n') - else: - f2.write(datetime_today+' '+datetime_time+' '+content+'\n') - -# 创建一个sh文件用于提交任务(PBS) -def make_sh_file_for_qsub(sh_filename='a', command_line='python a.py', cpu_num=1, task_name='task', cd_dir=0): - sh_content = \ - '#!/bin/sh\n' \ - +'#PBS -N '+task_name+'\n' \ - +'#PBS -l nodes=1:ppn='+str(cpu_num)+'\n' - if cd_dir==1: - sh_content += 'cd $PBS_O_WORKDIR\n' - sh_content += command_line - with open(sh_filename+'.sh', 'w') as f: - f.write(sh_content) - -# 创建一个sh文件用于提交任务(Slurm) -def make_sh_file_for_sbatch(sh_filename='a', command_line='python a.py', cpu_num=1, task_name='task', cd_dir=0): - sh_content = \ - '#!/bin/sh\n' \ - +'#SBATCH --job-name='+task_name+'\n' \ - +'#SBATCH --cpus-per-task='+str(cpu_num)+'\n' - if cd_dir==1: - sh_content += 'cd $PBS_O_WORKDIR\n' - sh_content += command_line - with open(sh_filename+'.sh', 'w') as f: - f.write(sh_content) - -# 创建一个sh文件用于提交任务(LSF) -def make_sh_file_for_bsub(sh_filename='a', command_line='python a.py', cpu_num=1, task_name='task', cd_dir=0, bsub_q=0, queue_name='score'): - sh_content = \ - '#!/bin/sh\n' \ - +'#BSUB -J '+task_name+'\n' \ - +'#BSUB -n '+str(cpu_num)+'\n' - if bsub_q==1: - sh_content += '#BSUB -q '+queue_name+'\n' - if cd_dir==1: - sh_content += 'cd $PBS_O_WORKDIR\n' - sh_content += command_line - with open(sh_filename+'.sh', 'w') as f: - f.write(sh_content) - -# qsub 提交任务(PBS) -def qsub_task(filename='a', file_format='.sh'): - import os - os.system('qsub '+filename+file_format) - -# sbatch 提交任务(Slurm) -def sbatch_task(filename='a', file_format='.sh'): - import os - os.system('sbatch '+filename+file_format) - -# bsub 提交任务(LSF) -def bsub_task(filename='a', file_format='.sh'): - import os - os.system('bsub < '+filename+file_format) - -# 复制.py和.sh文件,然后提交任务,实现半手动并行(PBS) -def copy_py_sh_file_and_qsub_task(parameter_array, py_filename='a', old_str_in_py='parameter = 0', new_str_in_py='parameter = ', sh_filename='a', task_name='task'): - import os - parameter_str_array = [] - for i0 in parameter_array: - parameter_str_array.append(str(i0)) - index = 0 - for parameter_str in parameter_str_array: - index += 1 - # copy python file - old_file = py_filename+'.py' - new_file = py_filename+'_'+str(index)+'.py' - os.system('cp '+old_file+' '+new_file) - with open(new_file, 'r') as f: - content = f.read() - old_str = old_str_in_py - new_str = new_str_in_py+parameter_str - content = content.replace(old_str, new_str) - with open(py_filename+'_'+str(index)+'.py', 'w') as f: - f.write(content) - # copy sh file - old_file = sh_filename+'.sh' - new_file = sh_filename+'_'+str(index)+'.sh' - os.system('cp '+old_file+' '+new_file) - with open(new_file, 'r') as f: - content = f.read() - old_str = 'python '+py_filename+'.py' - new_str = 'python '+py_filename+'_'+str(index)+'.py' - content = content.replace(old_str, new_str) - old_str = '#PBS -N '+task_name - new_str = '#PBS -N '+task_name+'_'+str(index) - content = content.replace(old_str, new_str) - with open(sh_filename+'_'+str(index)+'.sh', 'w') as f: - f.write(content) - # qsub task - os.system('qsub '+new_file) - -# 复制.py和.sh文件,然后提交任务,实现半手动并行(Slurm) -def copy_py_sh_file_and_sbatch_task(parameter_array, py_filename='a', old_str_in_py='parameter = 0', new_str_in_py='parameter = ', sh_filename='a', task_name='task'): - import os - parameter_str_array = [] - for i0 in parameter_array: - parameter_str_array.append(str(i0)) - index = 0 - for parameter_str in parameter_str_array: - index += 1 - # copy python file - old_file = py_filename+'.py' - new_file = py_filename+'_'+str(index)+'.py' - os.system('cp '+old_file+' '+new_file) - with open(new_file, 'r') as f: - content = f.read() - old_str = old_str_in_py - new_str = new_str_in_py+parameter_str - content = content.replace(old_str, new_str) - with open(py_filename+'_'+str(index)+'.py', 'w') as f: - f.write(content) - # copy sh file - old_file = sh_filename+'.sh' - new_file = sh_filename+'_'+str(index)+'.sh' - os.system('cp '+old_file+' '+new_file) - with open(new_file, 'r') as f: - content = f.read() - old_str = 'python '+py_filename+'.py' - new_str = 'python '+py_filename+'_'+str(index)+'.py' - content = content.replace(old_str, new_str) - old_str = '#SBATCH --job-name='+task_name - new_str = '#SBATCH --job-name='+task_name+'_'+str(index) - content = content.replace(old_str, new_str) - with open(sh_filename+'_'+str(index)+'.sh', 'w') as f: - f.write(content) - # sbatch task - os.system('sbatch '+new_file) - -# 复制.py和.sh文件,然后提交任务,实现半手动并行(LSF) -def copy_py_sh_file_and_bsub_task(parameter_array, py_filename='a', old_str_in_py='parameter = 0', new_str_in_py='parameter = ', sh_filename='a', task_name='task'): - import os - parameter_str_array = [] - for i0 in parameter_array: - parameter_str_array.append(str(i0)) - index = 0 - for parameter_str in parameter_str_array: - index += 1 - # copy python file - old_file = py_filename+'.py' - new_file = py_filename+'_'+str(index)+'.py' - os.system('cp '+old_file+' '+new_file) - with open(new_file, 'r') as f: - content = f.read() - old_str = old_str_in_py - new_str = new_str_in_py+parameter_str - content = content.replace(old_str, new_str) - with open(py_filename+'_'+str(index)+'.py', 'w') as f: - f.write(content) - # copy sh file - old_file = sh_filename+'.sh' - new_file = sh_filename+'_'+str(index)+'.sh' - os.system('cp '+old_file+' '+new_file) - with open(new_file, 'r') as f: - content = f.read() - old_str = 'python '+py_filename+'.py' - new_str = 'python '+py_filename+'_'+str(index)+'.py' - content = content.replace(old_str, new_str) - old_str = '#BSUB -J '+task_name - new_str = '#BSUB -J '+task_name+'_'+str(index) - content = content.replace(old_str, new_str) - with open(sh_filename+'_'+str(index)+'.sh', 'w') as f: - f.write(content) - # bsub task - os.system('bsub < '+new_file) - # 获取矩阵的维度(考虑单一数值的矩阵维度为1) def dimension_of_array(array): import numpy as np @@ -304,19 +54,6 @@ def rotate_point(x, y, angle_deg): x, y = np.dot(rotation_matrix, np.array([x, y])) return x, y -# CPU性能测试(十亿次循环的浮点加法运算的时间,约30秒左右) -def cpu_test_with_addition(print_show=1): - import time - result = 0.0 - start_time = time.time() - for _ in range(int(1e9)): - result += 1e-9 - end_time = time.time() - run_time = end_time - start_time - if print_show: - print(run_time) - return run_time - # 将XYZ数据转成矩阵数据(说明:x_array/y_array的输入和输出不一样。要求z_array数据中y对应的数据为小循环,x对应的数据为大循环) def convert_xyz_data_into_matrix_data(x_array, y_array, z_array): import numpy as np @@ -535,108 +272,6 @@ def hex_to_rgb(hex): length = len(hex) return tuple(int(hex[i:i+length//3], 16) for i in range(0, length, length//3)) -# 拼接两个PDF文件 -def combine_two_pdf_files(input_file_1='a.pdf', input_file_2='b.pdf', output_file='combined_file.pdf'): - import PyPDF2 - output_pdf = PyPDF2.PdfWriter() - with open(input_file_1, 'rb') as file1: - pdf1 = PyPDF2.PdfReader(file1) - for page in range(len(pdf1.pages)): - output_pdf.add_page(pdf1.pages[page]) - with open(input_file_2, 'rb') as file2: - pdf2 = PyPDF2.PdfReader(file2) - for page in range(len(pdf2.pages)): - output_pdf.add_page(pdf2.pages[page]) - with open(output_file, 'wb') as combined_file: - output_pdf.write(combined_file) - -# 使用pdfminer3k将PDF文件转成文本 -def pdf_to_text_with_pdfminer3k(pdf_path): - from pdfminer.pdfparser import PDFParser, PDFDocument - from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter - from pdfminer.converter import PDFPageAggregator - from pdfminer.layout import LAParams, LTTextBox - from pdfminer.pdfinterp import PDFTextExtractionNotAllowed - import logging - logging.Logger.propagate = False - logging.getLogger().setLevel(logging.ERROR) - praser = PDFParser(open(pdf_path, 'rb')) - doc = PDFDocument() - praser.set_document(doc) - doc.set_parser(praser) - doc.initialize() - if not doc.is_extractable: - raise PDFTextExtractionNotAllowed - else: - rsrcmgr = PDFResourceManager() - laparams = LAParams() - device = PDFPageAggregator(rsrcmgr, laparams=laparams) - interpreter = PDFPageInterpreter(rsrcmgr, device) - content = '' - for page in doc.get_pages(): - interpreter.process_page(page) - layout = device.get_result() - for x in layout: - if isinstance(x, LTTextBox): - content = content + x.get_text().strip() - return content - -# 使用PyPDF2将PDF文件转成文本 -def pdf_to_text_with_PyPDF2_for_all_pages(pdf_path): - import guan - num_pages = guan.get_pdf_page_number(pdf_path) - content = '' - for i0 in range(num_pages): - page_text = guan.pdf_to_txt_for_a_specific_page(pdf_path, page_num=i0+1) - content += page_text + '\n\n' - return content - -# 获取PDF文件页数 -def get_pdf_page_number(pdf_path): - import PyPDF2 - pdf_file = open(pdf_path, 'rb') - pdf_reader = PyPDF2.PdfReader(pdf_file) - num_pages = len(pdf_reader.pages) - return num_pages - -# 获取PDF文件指定页面的内容 -def pdf_to_txt_for_a_specific_page(pdf_path, page_num=1): - import PyPDF2 - pdf_file = open(pdf_path, 'rb') - pdf_reader = PyPDF2.PdfReader(pdf_file) - num_pages = len(pdf_reader.pages) - for page_num0 in range(num_pages): - if page_num0 == page_num-1: - page = pdf_reader.pages[page_num0] - page_text = page.extract_text() - pdf_file.close() - return page_text - -# 获取PDF文献中的链接。例如: link_starting_form='https://doi.org' -def get_links_from_pdf(pdf_path, link_starting_form=''): - import PyPDF2 - import re - reader = PyPDF2.PdfReader(pdf_path) - pages = len(reader.pages) - i0 = 0 - links = [] - for page in range(pages): - pageSliced = reader.pages[page] - pageObject = pageSliced.get_object() - if '/Annots' in pageObject.keys(): - ann = pageObject['/Annots'] - old = '' - for a in ann: - u = a.get_object() - if '/A' in u.keys(): - if '/URI' in u['/A']: - if re.search(re.compile('^'+link_starting_form), u['/A']['/URI']): - if u['/A']['/URI'] != old: - links.append(u['/A']['/URI']) - i0 += 1 - old = u['/A']['/URI'] - return links - # 使用MD5进行散列加密 def encryption_MD5(password, salt=''): import hashlib @@ -661,989 +296,4 @@ def encryption_bcrypt(password): # 验证bcrypt加密的密码(这里的hashed_password已经包含了生成时使用的盐,bcrypt.checkpw会自动从hashed_password中提取盐,因此在验证时无需再单独传递盐) def check_bcrypt_hashed_password(password_input, hashed_password): import bcrypt - return bcrypt.checkpw(password_input.encode('utf-8'), hashed_password) - -# 获取当前日期字符串 -def get_date(bar=True): - import datetime - datetime_date = str(datetime.date.today()) - if bar==False: - datetime_date = datetime_date.replace('-', '') - return datetime_date - -# 获取当前时间字符串 -def get_time(colon=True): - import datetime - datetime_time = datetime.datetime.now().strftime('%H:%M:%S') - if colon==False: - datetime_time = datetime_time.replace(':', '') - return datetime_time - -# 获取本月的所有日期 -def get_date_array_of_the_current_month(str_or_datetime='str'): - import datetime - today = datetime.date.today() - first_day_of_month = today.replace(day=1) - if first_day_of_month.month == 12: - next_month = first_day_of_month.replace(year=first_day_of_month.year + 1, month=1) - else: - next_month = first_day_of_month.replace(month=first_day_of_month.month + 1) - current_date = first_day_of_month - date_array = [] - while current_date < next_month: - if str_or_datetime=='str': - date_array.append(str(current_date)) - elif str_or_datetime=='datetime': - date_array.append(current_date) - current_date += datetime.timedelta(days=1) - return date_array - -# 获取上个月份 -def get_last_month(): - import datetime - today = datetime.date.today() - last_month = today.month - 1 - if last_month == 0: - last_month = 12 - year_of_last_month = today.year - 1 - else: - year_of_last_month = today.year - return year_of_last_month, last_month - -# 获取上上个月份 -def get_the_month_before_last(): - import datetime - today = datetime.date.today() - the_month_before_last = today.month - 2 - if the_month_before_last == 0: - the_month_before_last = 12 - year_of_the_month_before_last = today.year - 1 - else: - year_of_the_month_before_last = today.year - if the_month_before_last == -1: - the_month_before_last = 11 - year_of_the_month_before_last = today.year - 1 - else: - year_of_the_month_before_last = today.year - return year_of_the_month_before_last, the_month_before_last - -# 获取上个月的所有日期 -def get_date_array_of_the_last_month(str_or_datetime='str'): - import datetime - import guan - today = datetime.date.today() - year_of_last_month, last_month = guan.get_last_month() - first_day_of_month = today.replace(year=year_of_last_month, month=last_month, day=1) - if first_day_of_month.month == 12: - next_month = first_day_of_month.replace(year=first_day_of_month.year + 1, month=1) - else: - next_month = first_day_of_month.replace(month=first_day_of_month.month + 1) - current_date = first_day_of_month - date_array = [] - while current_date < next_month: - if str_or_datetime=='str': - date_array.append(str(current_date)) - elif str_or_datetime=='datetime': - date_array.append(current_date) - current_date += datetime.timedelta(days=1) - return date_array - -# 获取上上个月的所有日期 -def get_date_array_of_the_month_before_last(str_or_datetime='str'): - import datetime - import guan - today = datetime.date.today() - year_of_last_last_month, last_last_month = guan.get_the_month_before_last() - first_day_of_month = today.replace(year=year_of_last_last_month, month=last_last_month, day=1) - if first_day_of_month.month == 12: - next_month = first_day_of_month.replace(year=first_day_of_month.year + 1, month=1) - else: - next_month = first_day_of_month.replace(month=first_day_of_month.month + 1) - current_date = first_day_of_month - date_array = [] - while current_date < next_month: - if str_or_datetime=='str': - date_array.append(str(current_date)) - elif str_or_datetime=='datetime': - date_array.append(current_date) - current_date += datetime.timedelta(days=1) - return date_array - -# 根据新的日期,填充数组中缺少的数据为零 -def fill_zero_data_for_new_dates(old_dates, new_dates, old_data_array): - new_data_array = [] - for date in new_dates: - if str(date) not in old_dates: - new_data_array.append(0) - else: - index = old_dates.index(date) - new_data_array.append(old_data_array[index]) - return new_data_array - -# 获取内存信息 -def get_memory_info(): - import psutil - memory_info = psutil.virtual_memory() - total_memory = memory_info.total/(1024**2) - used_memory = memory_info.used/(1024**2) - available_memory = memory_info.available/(1024**2) - used_memory_percent = memory_info.percent - return total_memory, used_memory, available_memory, used_memory_percent - -# 获取CPU的平均使用率 -def get_cpu_usage(interval=1): - import psutil - cpu_usage = psutil.cpu_percent(interval=interval) - return cpu_usage - -# 获取每个CPU核心的使用率,返回列表 -def get_cpu_usage_array_per_core(interval=1): - import psutil - cpu_usage_array_per_core = psutil.cpu_percent(interval=interval, percpu=True) - return cpu_usage_array_per_core - -# 获取使用率最高的CPU核心的使用率 -def get_cpu_max_usage_for_all_cores(interval=1): - import guan - cpu_usage_array_per_core = guan.get_cpu_usage_array_per_core(interval=interval) - max_cpu_usage = max(cpu_usage_array_per_core) - return max_cpu_usage - -# 获取非零使用率的CPU核心的平均使用率 -def get_cpu_averaged_usage_for_non_zero_cores(interval=1): - import guan - cpu_usage_array_per_core = guan.get_cpu_usage_array_per_core(interval=interval) - cpu_usage_array_per_core_new = guan.remove_item_in_one_array(cpu_usage_array_per_core, 0.0) - averaged_cpu_usage = sum(cpu_usage_array_per_core_new)/len(cpu_usage_array_per_core_new) - return averaged_cpu_usage - -# 在一定数量周期内得到CPU的使用率信息。默认为1秒钟收集一次,(interval+sleep_interval)*times 为收集的时间范围,范围默认为60秒,即1分钟后返回列表,总共得到60组数据。其中,数字第一列和第二列分别是平均值和最大值。 -def get_cpu_information_for_times(interval=1, sleep_interval=0, times=60): - import guan - import time - cpu_information_array = [] - for _ in range(times): - cpu_information = [] - datetime_date = guan.get_date() - datetime_time = guan.get_time() - cpu_information.append(datetime_date) - cpu_information.append(datetime_time) - cpu_usage_array_per_core = guan.get_cpu_usage_array_per_core(interval=interval) - cpu_information.append(sum(cpu_usage_array_per_core)/len(cpu_usage_array_per_core)) - cpu_information.append(max(cpu_usage_array_per_core)) - for cpu_usage in cpu_usage_array_per_core: - cpu_information.append(cpu_usage) - cpu_information_array.append(cpu_information) - time.sleep(sleep_interval) - return cpu_information_array - -# 将得到的CPU的使用率信息写入文件。默认为1分钟收集一次,(interval+sleep_interval)*times 为收集的时间范围,范围默认为60分钟,即1小时写入文件一次,总共得到60组数据。其中,数字第一列和第二列分别是平均值和最大值。 -def write_cpu_information_to_file(filename='./cpu_usage', interval=1, sleep_interval=59, times=60): - import guan - guan.make_file(filename+'.txt') - while True: - f = guan.open_file(filename) - cpu_information_array = guan.get_cpu_information_for_times(interval=interval, sleep_interval=sleep_interval, times=times) - for cpu_information in cpu_information_array: - i0 = 0 - for information in cpu_information: - if i0 < 2: - f.write(str(information)+' ') - else: - f.write(f'{information:.1f} ') - i0 += 1 - f.write('\n') - f.close() - -# 画CPU的使用率图。默认为画最近的60个数据,以及不画CPU核心的最大使用率。 -def plot_cpu_information(filename='./cpu_usage', recent_num=60, max_cpu=0): - import guan - from datetime import datetime - with open(filename+".txt", "r") as file: - lines = file.readlines() - lines = lines[::-1] - timestamps_array = [] - averaged_cpu_usage_array = [] - max_cpu_usage_array = [] - i0 = 0 - for line in lines: - i0 += 1 - if i0 >= recent_num: - break - cpu_information = line.strip() - information = cpu_information.split() - time_str = information[0]+' '+information[1] - time_format = "%Y-%m-%d %H:%M:%S" - timestamps_array.append(datetime.strptime(time_str, time_format)) - averaged_cpu_usage_array.append(float(information[2])) - max_cpu_usage_array.append(float(information[3])) - plt, fig, ax = guan.import_plt_and_start_fig_ax(adjust_bottom=0.3, adjust_left=0.15, labelsize=16, fontfamily='Times New Roman') - plt.xticks(rotation=90) - guan.plot_without_starting_fig_ax(plt, fig, ax, timestamps_array, averaged_cpu_usage_array, style='o-') - legend_array = ['Averaged'] - if max_cpu == 1: - guan.plot_without_starting_fig_ax(plt, fig, ax, timestamps_array, max_cpu_usage_array, style='o-') - legend_array.append('Max') - guan.plot_without_starting_fig_ax(plt, fig, ax, [], [], xlabel='Time', ylabel='CPU usage', fontsize=20) - plt.legend(legend_array) - plt.show() - -# 画详细的CPU的使用率图,分CPU核心画图。 -def plot_detailed_cpu_information(filename='./cpu_usage', recent_num=60): - import guan - from datetime import datetime - with open(filename+".txt", "r") as file: - lines = file.readlines() - lines = lines[::-1] - timestamps_array = [] - i0 = 0 - core_num = len(lines[0].strip().split())-4 - detailed_cpu_usage_array = [] - for line in lines: - i0 += 1 - if i0 > recent_num: - break - cpu_information = line.strip() - information = cpu_information.split() - time_str = information[0]+' '+information[1] - time_format = "%Y-%m-%d %H:%M:%S" - timestamps_array.append(datetime.strptime(time_str, time_format)) - detailed_cpu_usage = [] - for core in range(core_num): - detailed_cpu_usage.append(float(information[4+core])) - detailed_cpu_usage_array.append(detailed_cpu_usage) - for core in range(core_num): - plt, fig, ax = guan.import_plt_and_start_fig_ax(adjust_bottom=0.3, adjust_left=0.15, labelsize=16, fontfamily='Times New Roman') - plt.xticks(rotation=90) - guan.plot_without_starting_fig_ax(plt, fig, ax, timestamps_array, [row[core] for row in detailed_cpu_usage_array], style='o-') - legend_array = [] - legend_array.append(f'CPU {core+1}') - guan.plot_without_starting_fig_ax(plt, fig, ax, [], [], xlabel='Time', ylabel='CPU usage', fontsize=20) - plt.legend(legend_array) - plt.show() - -# 获取MAC地址 -def get_mac_address(): - import uuid - mac_address = uuid.UUID(int=uuid.getnode()).hex[-12:].upper() - mac_address = '-'.join([mac_address[i:i+2] for i in range(0, 11, 2)]) - return mac_address - -# 获取软件包中的所有模块名 -def get_all_modules_in_one_package(package_name='guan'): - import pkgutil - package = __import__(package_name) - module_names = [name for _, name, _ in pkgutil.iter_modules(package.__path__)] - return module_names - -# 获取软件包中一个模块的所有函数名 -def get_all_functions_in_one_module(module_name, package_name='guan'): - import inspect - function_names = [] - module = __import__(f"{package_name}.{module_name}", fromlist=[""]) - for name, obj in inspect.getmembers(module): - if inspect.isfunction(obj): - function_names.append(name) - return function_names - -# 获取软件包中的所有函数名 -def get_all_functions_in_one_package(package_name='guan', print_show=1): - import guan - module_names = guan.get_all_modules_in_one_package(package_name=package_name) - all_function_names = [] - for module_name in module_names: - function_names = guan.get_all_functions_in_one_module(module_name, package_name='guan') - if print_show == 1: - print('Module:', module_name) - for name in function_names: - all_function_names.append(name) - if print_show == 1: - print('function:', name) - if print_show == 1: - print() - return all_function_names - -# 获取调用本函数的函数名 -def get_calling_function_name(layer=1): - import inspect - caller = inspect.stack()[layer] - calling_function_name = caller.function - return calling_function_name - -# 统计Python文件中import的数量并排序 -def count_number_of_import_statements(filename, file_format='.py', num=1000): - with open(filename+file_format, 'r') as file: - lines = file.readlines() - import_array = [] - for line in lines: - if 'import ' in line: - line = line.strip() - import_array.append(line) - from collections import Counter - import_statement_counter = Counter(import_array).most_common(num) - return import_statement_counter - -# 获取软件包的本机版本 -def get_current_version(package_name='guan'): - import importlib.metadata - try: - current_version = importlib.metadata.version(package_name) - return current_version - except: - return None - -# 获取Python软件包的最新版本 -def get_latest_version(package_name='guan', timeout=5): - import requests - url = f"https://pypi.org/pypi/{package_name}/json" - try: - response = requests.get(url, timeout=timeout) - except: - return None - if response.status_code == 200: - data = response.json() - latest_version = data["info"]["version"] - return latest_version - else: - return None - -# 获取包含某个字符的进程PID值 -def get_PID_array(name): - import subprocess - command = "ps -ef | grep "+name - result = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) - if result.returncode == 0: - ps_ef = result.stdout - import re - ps_ef_1 = re.split(r'\n', ps_ef) - id_running_array = [] - for ps_ef_item in ps_ef_1: - if ps_ef_item != '': - ps_ef_2 = re.split(r'\s+', ps_ef_item) - id_running_array.append(ps_ef_2[1]) - return id_running_array - -# 寻找所有的git仓库 -def find_git_repositories(base_path='./', ignored_directory_with_words=[]): - import os - git_repository_array = [] - for root, dirs, files in os.walk(base_path): - if '.git' in dirs: - ignore_signal = 0 - for word in ignored_directory_with_words: - if word in root: - ignore_signal = 1 - break - if ignore_signal == 0: - git_repository_array.append(root) - return git_repository_array - -# 在git仓库列表中找到有修改待commit的 -def get_git_repositories_to_commit(git_repository_array): - import os - import subprocess - git_repository_array_to_commit = [] - for repository in git_repository_array: - os.chdir(repository) - status = subprocess.check_output(['git', 'status']).decode('utf-8') - if 'nothing to commit, working tree clean' in status: - pass - else: - git_repository_array_to_commit.append(repository) - return git_repository_array_to_commit - -# 每日git commit次数的统计 -def statistics_of_git_commits(print_show=0, str_or_datetime='str'): - import subprocess - import collections - since_date = '100 year ago' - result = subprocess.run( - ['git', 'log', f'--since={since_date}', '--pretty=format:%ad', '--date=short'], - stdout=subprocess.PIPE, - text=True) - commits = result.stdout.strip().split('\n') - counter = collections.Counter(commits) - daily_commit_counts = dict(sorted(counter.items())) - date_array = [] - commit_count_array = [] - for date, count in daily_commit_counts.items(): - if print_show == 1: - print(f"{date}: {count} commits") - if str_or_datetime=='datetime': - import datetime - date_array.append(datetime.datetime.strptime(date, "%Y-%m-%d")) - elif str_or_datetime=='str': - date_array.append(date) - commit_count_array.append(count) - return date_array, commit_count_array - -# 将文件目录结构写入Markdown文件 -def write_file_list_in_markdown(directory='./', filename='a', reverse_positive_or_negative=1, starting_from_h1=None, banned_file_format=[], hide_file_format=None, divided_line=None, show_second_number=None, show_third_number=None): - import os - f = open(filename+'.md', 'w', encoding="utf-8") - filenames1 = os.listdir(directory) - u0 = 0 - for filename1 in filenames1[::reverse_positive_or_negative]: - filename1_with_path = os.path.join(directory,filename1) - if os.path.isfile(filename1_with_path): - if os.path.splitext(filename1)[1] not in banned_file_format: - if hide_file_format == None: - f.write('+ '+str(filename1)+'\n\n') - else: - f.write('+ '+str(os.path.splitext(filename1)[0])+'\n\n') - else: - u0 += 1 - if divided_line != None and u0 != 1: - f.write('--------\n\n') - if starting_from_h1 == None: - f.write('#') - f.write('# '+str(filename1)+'\n\n') - - filenames2 = os.listdir(filename1_with_path) - i0 = 0 - for filename2 in filenames2[::reverse_positive_or_negative]: - filename2_with_path = os.path.join(directory, filename1, filename2) - if os.path.isfile(filename2_with_path): - if os.path.splitext(filename2)[1] not in banned_file_format: - if hide_file_format == None: - f.write('+ '+str(filename2)+'\n\n') - else: - f.write('+ '+str(os.path.splitext(filename2)[0])+'\n\n') - else: - i0 += 1 - if starting_from_h1 == None: - f.write('#') - if show_second_number != None: - f.write('## '+str(i0)+'. '+str(filename2)+'\n\n') - else: - f.write('## '+str(filename2)+'\n\n') - - j0 = 0 - filenames3 = os.listdir(filename2_with_path) - for filename3 in filenames3[::reverse_positive_or_negative]: - filename3_with_path = os.path.join(directory, filename1, filename2, filename3) - if os.path.isfile(filename3_with_path): - if os.path.splitext(filename3)[1] not in banned_file_format: - if hide_file_format == None: - f.write('+ '+str(filename3)+'\n\n') - else: - f.write('+ '+str(os.path.splitext(filename3)[0])+'\n\n') - else: - j0 += 1 - if starting_from_h1 == None: - f.write('#') - if show_third_number != None: - f.write('### ('+str(j0)+') '+str(filename3)+'\n\n') - else: - f.write('### '+str(filename3)+'\n\n') - - filenames4 = os.listdir(filename3_with_path) - for filename4 in filenames4[::reverse_positive_or_negative]: - filename4_with_path = os.path.join(directory, filename1, filename2, filename3, filename4) - if os.path.isfile(filename4_with_path): - if os.path.splitext(filename4)[1] not in banned_file_format: - if hide_file_format == None: - f.write('+ '+str(filename4)+'\n\n') - else: - f.write('+ '+str(os.path.splitext(filename4)[0])+'\n\n') - else: - if starting_from_h1 == None: - f.write('#') - f.write('#### '+str(filename4)+'\n\n') - - filenames5 = os.listdir(filename4_with_path) - for filename5 in filenames5[::reverse_positive_or_negative]: - filename5_with_path = os.path.join(directory, filename1, filename2, filename3, filename4, filename5) - if os.path.isfile(filename5_with_path): - if os.path.splitext(filename5)[1] not in banned_file_format: - if hide_file_format == None: - f.write('+ '+str(filename5)+'\n\n') - else: - f.write('+ '+str(os.path.splitext(filename5)[0])+'\n\n') - else: - if starting_from_h1 == None: - f.write('#') - f.write('##### '+str(filename5)+'\n\n') - - filenames6 = os.listdir(filename5_with_path) - for filename6 in filenames6[::reverse_positive_or_negative]: - filename6_with_path = os.path.join(directory, filename1, filename2, filename3, filename4, filename5, filename6) - if os.path.isfile(filename6_with_path): - if os.path.splitext(filename6)[1] not in banned_file_format: - if hide_file_format == None: - f.write('+ '+str(filename6)+'\n\n') - else: - f.write('+ '+str(os.path.splitext(filename6)[0])+'\n\n') - else: - if starting_from_h1 == None: - f.write('#') - f.write('###### '+str(filename6)+'\n\n') - f.close() - -# 从网页的标签中获取内容 -def get_html_from_tags(link, tags=['title', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'li', 'a']): - from bs4 import BeautifulSoup - import urllib.request - import ssl - ssl._create_default_https_context = ssl._create_unverified_context - html = urllib.request.urlopen(link).read().decode('utf-8') - soup = BeautifulSoup(html, features="lxml") - all_tags = soup.find_all(tags) - content = '' - for tag in all_tags: - text = tag.get_text().replace('\n', '') - if content == '': - content = text - else: - content = content + '\n\n' + text - return content - -# 从HTML中获取所有的链接 -def get_links_from_html(html_link, links_with_text=0): - from bs4 import BeautifulSoup - import urllib.request - import ssl - ssl._create_default_https_context = ssl._create_unverified_context - html = urllib.request.urlopen(html_link).read().decode('utf-8') - soup = BeautifulSoup(html, features="lxml") - a_tags = soup.find_all('a') - if links_with_text == 0: - link_array = [tag.get('href') for tag in a_tags if tag.get('href')] - return link_array - else: - link_array_with_text = [(tag.get('href'), tag.text) for tag in a_tags if tag.get('href')] - return link_array_with_text - -# 检查链接的有效性 -def check_link(url, timeout=3, allow_redirects=True): - import requests - try: - response = requests.head(url, timeout=timeout, allow_redirects=allow_redirects) - if response.status_code == 200: - return True - else: - return False - except requests.exceptions.RequestException: - return False - -# 检查链接数组中链接的有效性 -def check_link_array(link_array, timeout=3, allow_redirects=True, try_again=0, print_show=1): - import guan - failed_link_array0 = [] - for link in link_array: - if link=='#' or guan.check_link(link, timeout=timeout, allow_redirects=allow_redirects): - pass - else: - failed_link_array0.append(link) - if print_show: - print(link) - failed_link_array = [] - if try_again: - if print_show: - print('\nTry again:\n') - for link in failed_link_array0: - if link=='#' or guan.check_link(link, timeout=timeout, allow_redirects=allow_redirects): - pass - else: - failed_link_array.append(link) - if print_show: - print(link) - else: - failed_link_array = failed_link_array0 - return failed_link_array - -# 生成二维码 -def creat_qrcode(data="https://www.guanjihuan.com", filename='a', file_format='.png'): - import qrcode - img = qrcode.make(data) - img.save(filename+file_format) - -# 通过Sci-Hub网站下载文献 -def download_with_scihub(address=None, num=1): - from bs4 import BeautifulSoup - import re - import requests - import os - if num==1 and address!=None: - address_array = [address] - else: - address_array = [] - for i in range(num): - address = input('\nInput:') - address_array.append(address) - for address in address_array: - r = requests.post('https://sci-hub.st/', data={'request': address}) - print('\nResponse:', r) - print('Address:', r.url) - soup = BeautifulSoup(r.text, features='lxml') - pdf_URL = soup.embed['src'] - # pdf_URL = soup.iframe['src'] # This is a code line of history version which fails to get pdf URL. - if re.search(re.compile('^https:'), pdf_URL): - pass - else: - pdf_URL = 'https:'+pdf_URL - print('PDF address:', pdf_URL) - name = re.search(re.compile('fdp.*?/'),pdf_URL[::-1]).group()[::-1][1::] - print('PDF name:', name) - print('Directory:', os.getcwd()) - print('\nDownloading...') - r = requests.get(pdf_URL, stream=True) - with open(name, 'wb') as f: - for chunk in r.iter_content(chunk_size=32): - f.write(chunk) - print('Completed!\n') - if num != 1: - print('All completed!\n') - -# 将字符串转成音频 -def str_to_audio(str='hello world', filename='str', rate=125, voice=1, read=1, save=0, compress=0, bitrate='16k', print_text=0): - import pyttsx3 - import guan - if print_text==1: - print(str) - engine = pyttsx3.init() - voices = engine.getProperty('voices') - engine.setProperty('voice', voices[voice].id) - engine.setProperty("rate", rate) - if save==1: - engine.save_to_file(str, filename+'.wav') - engine.runAndWait() - print('Wav file saved!') - if compress==1: - import os - os.rename(filename+'.wav', 'temp.wav') - guan.compress_wav_to_mp3('temp.wav', output_filename=filename+'.mp3', bitrate=bitrate) - os.remove('temp.wav') - if read==1: - engine.say(str) - engine.runAndWait() - -# 将txt文件转成音频 -def txt_to_audio(txt_path, rate=125, voice=1, read=1, save=0, compress=0, bitrate='16k', print_text=0): - import pyttsx3 - import guan - f = open(txt_path, 'r', encoding ='utf-8') - text = f.read() - if print_text==1: - print(text) - engine = pyttsx3.init() - voices = engine.getProperty('voices') - engine.setProperty('voice', voices[voice].id) - engine.setProperty("rate", rate) - if save==1: - import re - filename = re.split('[/,\\\]', txt_path)[-1][:-4] - engine.save_to_file(text, filename+'.wav') - engine.runAndWait() - print('Wav file saved!') - if compress==1: - import os - os.rename(filename+'.wav', 'temp.wav') - guan.compress_wav_to_mp3('temp.wav', output_filename=filename+'.mp3', bitrate=bitrate) - os.remove('temp.wav') - if read==1: - engine.say(text) - engine.runAndWait() - -# 将PDF文件转成音频 -def pdf_to_audio(pdf_path, rate=125, voice=1, read=1, save=0, compress=0, bitrate='16k', print_text=0): - import pyttsx3 - import guan - text = guan.pdf_to_text(pdf_path) - text = text.replace('\n', ' ') - if print_text==1: - print(text) - engine = pyttsx3.init() - voices = engine.getProperty('voices') - engine.setProperty('voice', voices[voice].id) - engine.setProperty("rate", rate) - if save==1: - import re - filename = re.split('[/,\\\]', pdf_path)[-1][:-4] - engine.save_to_file(text, filename+'.wav') - engine.runAndWait() - print('Wav file saved!') - if compress==1: - import os - os.rename(filename+'.wav', 'temp.wav') - guan.compress_wav_to_mp3('temp.wav', output_filename=filename+'.mp3', bitrate=bitrate) - os.remove('temp.wav') - if read==1: - engine.say(text) - engine.runAndWait() - -# 将wav音频文件压缩成MP3音频文件 -def compress_wav_to_mp3(wav_path, output_filename='a.mp3', bitrate='16k'): - # Note: Beside the installation of pydub, you may also need download FFmpeg on http://www.ffmpeg.org/download.html and add the bin path to the environment variable. - from pydub import AudioSegment - sound = AudioSegment.from_mp3(wav_path) - sound.export(output_filename,format="mp3",bitrate=bitrate) - -# 将WordPress导出的XML格式文件转换成多个MarkDown格式的文件 -def convert_wordpress_xml_to_markdown(xml_file='./a.xml', convert_content=1, replace_more=[]): - import xml.etree.ElementTree as ET - import re - tree = ET.parse(xml_file) - root = tree.getroot() - for item in root.findall('.//item'): - title = item.find('title').text - content = item.find('.//content:encoded', namespaces={'content': 'http://purl.org/rss/1.0/modules/content/'}).text - if convert_content == 1: - try: - content = re.sub(r'', '', content) - content = content.replace('

', '') - content = content.replace('

', '') - content = content.replace('
    ', '') - content = content.replace('
', '') - content = content.replace('', '') - content = content.replace('', '') - content = content.replace('', '') - content = content.replace('', '') - content = content.replace('
  • ', '+ ') - content = content.replace('', '') - content = re.sub(r'', '## ', content) - content = re.sub(r'', '### ', content) - content = re.sub(r'', '#### ', content) - for replace_item in replace_more: - content = content.replace(replace_item, '') - for _ in range(100): - content = content.replace('\n\n\n', '\n\n') - except: - print(f'提示:字符串替换出现问题!出现问题的内容为:{content}') - else: - pass - markdown_content = f"# {title}\n{content}" - markdown_file_path = f"{title}.md" - cleaned_filename = re.sub(r'[/:*?"<>|\'\\]', ' ', markdown_file_path) - with open(cleaned_filename, 'w', encoding='utf-8') as md_file: - md_file.write(markdown_content) - -# 凯利公式 -def kelly_formula(p, b, a=1): - f=(p/a)-((1-p)/b) - return f - -# 获取所有股票 -def all_stocks(): - import numpy as np - import akshare as ak - stocks = ak.stock_zh_a_spot_em() - title = np.array(stocks.columns) - stock_data = stocks.values - return title, stock_data - -# 获取所有股票的代码 -def all_stock_symbols(): - import guan - title, stock_data = guan.all_stocks() - stock_symbols = stock_data[:, 1] - return stock_symbols - -# 股票代码的分类 -def stock_symbols_classification(): - import guan - import re - stock_symbols = guan.all_stock_symbols() - # 上交所主板 - stock_symbols_60 = [] - for stock_symbol in stock_symbols: - find_600 = re.findall(r'^600', stock_symbol) - find_601 = re.findall(r'^601', stock_symbol) - find_603 = re.findall(r'^603', stock_symbol) - find_605 = re.findall(r'^605', stock_symbol) - if find_600 != [] or find_601 != [] or find_603 != [] or find_605 != []: - stock_symbols_60.append(stock_symbol) - # 深交所主板 - stock_symbols_00 = [] - for stock_symbol in stock_symbols: - find_000 = re.findall(r'^000', stock_symbol) - find_001 = re.findall(r'^001', stock_symbol) - find_002 = re.findall(r'^002', stock_symbol) - find_003 = re.findall(r'^003', stock_symbol) - if find_000 != [] or find_001 != [] or find_002 != [] or find_003 != []: - stock_symbols_00.append(stock_symbol) - # 创业板 - stock_symbols_30 = [] - for stock_symbol in stock_symbols: - find_300 = re.findall(r'^300', stock_symbol) - find_301 = re.findall(r'^301', stock_symbol) - if find_300 != [] or find_301 != []: - stock_symbols_30.append(stock_symbol) - # 科创板 - stock_symbols_68 = [] - for stock_symbol in stock_symbols: - find_688 = re.findall(r'^688', stock_symbol) - find_689 = re.findall(r'^689', stock_symbol) - if find_688 != [] or find_689 != []: - stock_symbols_68.append(stock_symbol) - # 新三板 - stock_symbols_8_4 = [] - for stock_symbol in stock_symbols: - find_82 = re.findall(r'^82', stock_symbol) - find_83 = re.findall(r'^83', stock_symbol) - find_87 = re.findall(r'^87', stock_symbol) - find_88 = re.findall(r'^88', stock_symbol) - find_430 = re.findall(r'^430', stock_symbol) - find_420 = re.findall(r'^420', stock_symbol) - find_400 = re.findall(r'^400', stock_symbol) - if find_82 != [] or find_83 != [] or find_87 != [] or find_88 != [] or find_430 != [] or find_420 != [] or find_400 != []: - stock_symbols_8_4.append(stock_symbol) - # 检查遗漏的股票代码 - stock_symbols_others = [] - for stock_symbol in stock_symbols: - if stock_symbol not in stock_symbols_60 and stock_symbol not in stock_symbols_00 and stock_symbol not in stock_symbols_30 and stock_symbol not in stock_symbols_68 and stock_symbol not in stock_symbols_8_4: - stock_symbols_others.others.append(stock_symbol) - return stock_symbols_60, stock_symbols_00, stock_symbols_30, stock_symbols_68, stock_symbols_8_4, stock_symbols_others - -# 股票代码各个分类的数量 -def statistics_of_stock_symbols_classification(): - import guan - stock_symbols_60, stock_symbols_00, stock_symbols_30, stock_symbols_68, stock_symbols_8_4, stock_symbols_others = guan.stock_symbols_classification() - num_stocks_60 = len(stock_symbols_60) - num_stocks_00 = len(stock_symbols_00) - num_stocks_30 = len(stock_symbols_30) - num_stocks_68 = len(stock_symbols_68) - num_stocks_8_4 = len(stock_symbols_8_4) - num_stocks_others= len(stock_symbols_others) - return num_stocks_60, num_stocks_00, num_stocks_30, num_stocks_68, num_stocks_8_4, num_stocks_others - -# 从股票代码获取股票名称 -def find_stock_name_from_symbol(symbol='000002'): - import guan - title, stock_data = guan.all_stocks() - for stock in stock_data: - if symbol in stock: - stock_name = stock[2] - return stock_name - -# 市值排序 -def sorted_market_capitalization(num=10): - import numpy as np - import guan - title, stock_data = guan.all_stocks() - new_stock_data = [] - for stock in stock_data: - if np.isnan(float(stock[9])): - continue - else: - new_stock_data.append(stock) - new_stock_data = np.array(new_stock_data) - list_index = np.argsort(new_stock_data[:, 17]) - list_index = list_index[::-1] - if num == None: - num = len(list_index) - sorted_array = [] - for i0 in range(num): - stock_symbol = new_stock_data[list_index[i0], 1] - stock_name = new_stock_data[list_index[i0], 2] - market_capitalization = new_stock_data[list_index[i0], 17]/1e8 - sorted_array.append([i0+1, stock_symbol, stock_name, market_capitalization]) - return sorted_array - -# 美股市值排序 -def sorted_market_capitalization_us(num=10): - import akshare as ak - import numpy as np - stocks = ak.stock_us_spot_em() - stock_data = stocks.values - new_stock_data = [] - for stock in stock_data: - if np.isnan(float(stock[9])): - continue - else: - new_stock_data.append(stock) - new_stock_data = np.array(new_stock_data) - list_index = np.argsort(new_stock_data[:, 9]) - list_index = list_index[::-1] - if num == None: - num = len(list_index) - sorted_array = [] - for i0 in range(num): - stock_symbol = new_stock_data[list_index[i0], 15] - stock_name = new_stock_data[list_index[i0], 1] - market_capitalization = new_stock_data[list_index[i0], 9]/1e8 - sorted_array.append([i0+1, stock_symbol, stock_name, market_capitalization]) - return sorted_array - -# 获取单个股票的历史数据 -def history_data_of_one_stock(symbol='000002', period='daily', start_date="19000101", end_date='21000101'): - # period = 'daily' - # period = 'weekly' - # period = 'monthly' - import numpy as np - import akshare as ak - stock = ak.stock_zh_a_hist(symbol=symbol, period=period, start_date=start_date, end_date=end_date) - title = np.array(stock.columns) - stock_data = stock.values[::-1] - return title, stock_data - -# 绘制股票图 -def plot_stock_line(date_array, opening_array, closing_array, high_array, low_array, lw_open_close=6, lw_high_low=2, xlabel='date', ylabel='price', title='', fontsize=20, labelsize=20, adjust_bottom=0.2, adjust_left=0.2, fontfamily='Times New Roman'): - import guan - plt, fig, ax = guan.import_plt_and_start_fig_ax(adjust_bottom=adjust_bottom, adjust_left=adjust_left, labelsize=labelsize, fontfamily=fontfamily) - if fontfamily=='Times New Roman': - ax.set_title(title, fontsize=fontsize, fontfamily='Times New Roman') - ax.set_xlabel(xlabel, fontsize=fontsize, fontfamily='Times New Roman') - ax.set_ylabel(ylabel, fontsize=fontsize, fontfamily='Times New Roman') - else: - ax.set_title(title, fontsize=fontsize) - ax.set_xlabel(xlabel, fontsize=fontsize) - ax.set_ylabel(ylabel, fontsize=fontsize) - for i0 in range(len(date_array)): - if opening_array[i0] <= closing_array[i0]: - ax.vlines(date_array[i0], opening_array[i0], closing_array[i0], linestyle='-', color='red', lw=lw_open_close) - ax.vlines(date_array[i0], low_array[i0], high_array[i0], color='red', linestyle='-', lw=lw_high_low) - else: - ax.vlines(date_array[i0], opening_array[i0], closing_array[i0], linestyle='-', color='green', lw=lw_open_close) - ax.vlines(date_array[i0], low_array[i0], high_array[i0], color='green', linestyle='-', lw=lw_high_low) - plt.show() - plt.close('all') - -# Guan软件包的使用统计(仅仅统计装机数和import次数) -def statistics_of_guan_package(function_name=None): - import guan - try: - import socket - datetime_date = guan.get_date() - datetime_time = guan.get_time() - current_version = guan.get_current_version('guan') - client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - client_socket.settimeout(0.5) - client_socket.connect(('socket.guanjihuan.com', 12345)) - mac_address = guan.get_mac_address() - if function_name == None: - message = { - 'server': 'py.guanjihuan.com', - 'date': datetime_date, - 'time': datetime_time, - 'version': current_version, - 'MAC_address': mac_address, - } - else: - message = { - 'server': 'py.guanjihuan.com', - 'date': datetime_date, - 'time': datetime_time, - 'version': current_version, - 'MAC_address': mac_address, - 'function_name': function_name - } - import json - send_message = json.dumps(message) - client_socket.send(send_message.encode()) - client_socket.close() - except: - pass - -# Guan软件包升级检查和提示(如果无法连接或者版本为最新,那么均没有提示) -def notification_of_upgrade(timeout=5): - try: - import guan - latest_version = guan.get_latest_version(package_name='guan', timeout=timeout) - current_version = guan.get_current_version('guan') - if latest_version != None and current_version != None: - if latest_version != current_version: - print('升级提示:您当前使用的版本是 guan-'+current_version+',目前已经有最新版本 guan-'+latest_version+'。您可以通过以下命令对软件包进行升级:pip install --upgrade guan -i https://pypi.python.org/simple 或 pip install --upgrade guan') - except: - pass \ No newline at end of file + return bcrypt.checkpw(password_input.encode('utf-8'), hashed_password) \ No newline at end of file diff --git a/PyPI/src/guan/file_reading_and_writing.py b/PyPI/src/guan/file_reading_and_writing.py index 817fc6d..165042a 100644 --- a/PyPI/src/guan/file_reading_and_writing.py +++ b/PyPI/src/guan/file_reading_and_writing.py @@ -243,6 +243,170 @@ def write_two_dimensional_data_without_xy_array_and_without_opening_file(matrix, f.write(str(element)+' ') f.write('\n') +# 创建一个sh文件用于提交任务(PBS) +def make_sh_file_for_qsub(sh_filename='a', command_line='python a.py', cpu_num=1, task_name='task', cd_dir=0): + sh_content = \ + '#!/bin/sh\n' \ + +'#PBS -N '+task_name+'\n' \ + +'#PBS -l nodes=1:ppn='+str(cpu_num)+'\n' + if cd_dir==1: + sh_content += 'cd $PBS_O_WORKDIR\n' + sh_content += command_line + with open(sh_filename+'.sh', 'w') as f: + f.write(sh_content) + +# 创建一个sh文件用于提交任务(Slurm) +def make_sh_file_for_sbatch(sh_filename='a', command_line='python a.py', cpu_num=1, task_name='task', cd_dir=0): + sh_content = \ + '#!/bin/sh\n' \ + +'#SBATCH --job-name='+task_name+'\n' \ + +'#SBATCH --cpus-per-task='+str(cpu_num)+'\n' + if cd_dir==1: + sh_content += 'cd $PBS_O_WORKDIR\n' + sh_content += command_line + with open(sh_filename+'.sh', 'w') as f: + f.write(sh_content) + +# 创建一个sh文件用于提交任务(LSF) +def make_sh_file_for_bsub(sh_filename='a', command_line='python a.py', cpu_num=1, task_name='task', cd_dir=0, bsub_q=0, queue_name='score'): + sh_content = \ + '#!/bin/sh\n' \ + +'#BSUB -J '+task_name+'\n' \ + +'#BSUB -n '+str(cpu_num)+'\n' + if bsub_q==1: + sh_content += '#BSUB -q '+queue_name+'\n' + if cd_dir==1: + sh_content += 'cd $PBS_O_WORKDIR\n' + sh_content += command_line + with open(sh_filename+'.sh', 'w') as f: + f.write(sh_content) + +# qsub 提交任务(PBS) +def qsub_task(filename='a', file_format='.sh'): + import os + os.system('qsub '+filename+file_format) + +# sbatch 提交任务(Slurm) +def sbatch_task(filename='a', file_format='.sh'): + import os + os.system('sbatch '+filename+file_format) + +# bsub 提交任务(LSF) +def bsub_task(filename='a', file_format='.sh'): + import os + os.system('bsub < '+filename+file_format) + +# 复制.py和.sh文件,然后提交任务,实现半手动并行(PBS) +def copy_py_sh_file_and_qsub_task(parameter_array, py_filename='a', old_str_in_py='parameter = 0', new_str_in_py='parameter = ', sh_filename='a', task_name='task'): + import os + parameter_str_array = [] + for i0 in parameter_array: + parameter_str_array.append(str(i0)) + index = 0 + for parameter_str in parameter_str_array: + index += 1 + # copy python file + old_file = py_filename+'.py' + new_file = py_filename+'_'+str(index)+'.py' + os.system('cp '+old_file+' '+new_file) + with open(new_file, 'r') as f: + content = f.read() + old_str = old_str_in_py + new_str = new_str_in_py+parameter_str + content = content.replace(old_str, new_str) + with open(py_filename+'_'+str(index)+'.py', 'w') as f: + f.write(content) + # copy sh file + old_file = sh_filename+'.sh' + new_file = sh_filename+'_'+str(index)+'.sh' + os.system('cp '+old_file+' '+new_file) + with open(new_file, 'r') as f: + content = f.read() + old_str = 'python '+py_filename+'.py' + new_str = 'python '+py_filename+'_'+str(index)+'.py' + content = content.replace(old_str, new_str) + old_str = '#PBS -N '+task_name + new_str = '#PBS -N '+task_name+'_'+str(index) + content = content.replace(old_str, new_str) + with open(sh_filename+'_'+str(index)+'.sh', 'w') as f: + f.write(content) + # qsub task + os.system('qsub '+new_file) + +# 复制.py和.sh文件,然后提交任务,实现半手动并行(Slurm) +def copy_py_sh_file_and_sbatch_task(parameter_array, py_filename='a', old_str_in_py='parameter = 0', new_str_in_py='parameter = ', sh_filename='a', task_name='task'): + import os + parameter_str_array = [] + for i0 in parameter_array: + parameter_str_array.append(str(i0)) + index = 0 + for parameter_str in parameter_str_array: + index += 1 + # copy python file + old_file = py_filename+'.py' + new_file = py_filename+'_'+str(index)+'.py' + os.system('cp '+old_file+' '+new_file) + with open(new_file, 'r') as f: + content = f.read() + old_str = old_str_in_py + new_str = new_str_in_py+parameter_str + content = content.replace(old_str, new_str) + with open(py_filename+'_'+str(index)+'.py', 'w') as f: + f.write(content) + # copy sh file + old_file = sh_filename+'.sh' + new_file = sh_filename+'_'+str(index)+'.sh' + os.system('cp '+old_file+' '+new_file) + with open(new_file, 'r') as f: + content = f.read() + old_str = 'python '+py_filename+'.py' + new_str = 'python '+py_filename+'_'+str(index)+'.py' + content = content.replace(old_str, new_str) + old_str = '#SBATCH --job-name='+task_name + new_str = '#SBATCH --job-name='+task_name+'_'+str(index) + content = content.replace(old_str, new_str) + with open(sh_filename+'_'+str(index)+'.sh', 'w') as f: + f.write(content) + # sbatch task + os.system('sbatch '+new_file) + +# 复制.py和.sh文件,然后提交任务,实现半手动并行(LSF) +def copy_py_sh_file_and_bsub_task(parameter_array, py_filename='a', old_str_in_py='parameter = 0', new_str_in_py='parameter = ', sh_filename='a', task_name='task'): + import os + parameter_str_array = [] + for i0 in parameter_array: + parameter_str_array.append(str(i0)) + index = 0 + for parameter_str in parameter_str_array: + index += 1 + # copy python file + old_file = py_filename+'.py' + new_file = py_filename+'_'+str(index)+'.py' + os.system('cp '+old_file+' '+new_file) + with open(new_file, 'r') as f: + content = f.read() + old_str = old_str_in_py + new_str = new_str_in_py+parameter_str + content = content.replace(old_str, new_str) + with open(py_filename+'_'+str(index)+'.py', 'w') as f: + f.write(content) + # copy sh file + old_file = sh_filename+'.sh' + new_file = sh_filename+'_'+str(index)+'.sh' + os.system('cp '+old_file+' '+new_file) + with open(new_file, 'r') as f: + content = f.read() + old_str = 'python '+py_filename+'.py' + new_str = 'python '+py_filename+'_'+str(index)+'.py' + content = content.replace(old_str, new_str) + old_str = '#BSUB -J '+task_name + new_str = '#BSUB -J '+task_name+'_'+str(index) + content = content.replace(old_str, new_str) + with open(sh_filename+'_'+str(index)+'.sh', 'w') as f: + f.write(content) + # bsub task + os.system('bsub < '+new_file) + # 把矩阵写入.md文件(Markdown表格形式) def write_matrix_in_markdown_format(matrix, filename='a'): import numpy as np diff --git a/PyPI/src/guan/others.py b/PyPI/src/guan/others.py new file mode 100644 index 0000000..0d698b1 --- /dev/null +++ b/PyPI/src/guan/others.py @@ -0,0 +1,1187 @@ +# Module: others + +# AI 对话 +def chat(prompt='你好', model=1, stream=1, stream_label=0): + import requests + url = "http://api.guanjihuan.com/chat" + data = { + "prompt": prompt, + "model": model, + } + if stream == 1: + if stream_label == 1: + print('\n--- Start Chat Stream Message ---\n') + requests_response = requests.post(url, json=data, stream=True) + response = '' + if requests_response.status_code == 200: + for line in requests_response.iter_lines(): + if line: + if stream == 1: + print(line.decode('utf-8'), end='', flush=True) + response += line.decode('utf-8') + print() + else: + pass + if stream == 1: + if stream_label == 1: + print('\n--- End Chat Stream Message ---\n') + return response + +# 加上函数代码的 AI 对话 +def chat_with_function_code(function_name, prompt='', model=1, stream=1): + import guan + function_source = guan.get_source(function_name) + if prompt == '': + response = guan.chat(prompt=function_source, model=model, stream=stream) + else: + response = guan.chat(prompt=function_source+'\n\n'+prompt, model=model, stream=stream) + return response + +# 机器人自动对话 +def auto_chat(prompt='你好', round=2, model=1, stream=1): + import guan + response0 = prompt + for i0 in range(round): + print(f'\n【对话第 {i0+1} 轮】\n') + print('机器人 1: ') + response1 = guan.chat(prompt=response0, model=model, stream=stream) + print('机器人 2: ') + response0 = guan.chat(prompt=response1, model=model, stream=stream) + +# 机器人自动对话(引导对话) +def auto_chat_with_guide(prompt='你好', guide_message='(回答字数少于30个字,最后反问我一个问题)', round=5, model=1, stream=1): + import guan + response0 = prompt + for i0 in range(round): + print(f'\n【对话第 {i0+1} 轮】\n') + print('机器人 1: ') + response1 = guan.chat(prompt=response0+guide_message, model=model, stream=stream) + print('机器人 2: ') + response0 = guan.chat(prompt=response1+guide_message, model=model, stream=stream) + +# 在云端服务器上运行函数(需要函数是独立可运行的代码) +def run(function_name, *args, **kwargs): + import requests + import guan + url = "http://run.guanjihuan.com/run_function" + function_source = guan.get_source(function_name) + data = { + "function_name": function_name.__name__, + "function_source": function_source, + 'args': str(args), + 'kwargs': str(kwargs), + } + return_data = None + try: + response = requests.post(url, json=data) + if response.status_code == 200: + result = response.json() + print_data = result['print_data'] + print(print_data, end='') + encoded_return_data = result['encoded_return_data'] + import base64 + import pickle + return_data = pickle.loads(base64.b64decode(encoded_return_data)) + except: + pass + return return_data + +# CPU性能测试(十亿次循环的浮点加法运算的时间,约30秒左右) +def cpu_test_with_addition(print_show=1): + import time + result = 0.0 + start_time = time.time() + for _ in range(int(1e9)): + result += 1e-9 + end_time = time.time() + run_time = end_time - start_time + if print_show: + print(run_time) + return run_time + +# 拼接两个PDF文件 +def combine_two_pdf_files(input_file_1='a.pdf', input_file_2='b.pdf', output_file='combined_file.pdf'): + import PyPDF2 + output_pdf = PyPDF2.PdfWriter() + with open(input_file_1, 'rb') as file1: + pdf1 = PyPDF2.PdfReader(file1) + for page in range(len(pdf1.pages)): + output_pdf.add_page(pdf1.pages[page]) + with open(input_file_2, 'rb') as file2: + pdf2 = PyPDF2.PdfReader(file2) + for page in range(len(pdf2.pages)): + output_pdf.add_page(pdf2.pages[page]) + with open(output_file, 'wb') as combined_file: + output_pdf.write(combined_file) + +# 使用pdfminer3k将PDF文件转成文本 +def pdf_to_text_with_pdfminer3k(pdf_path): + from pdfminer.pdfparser import PDFParser, PDFDocument + from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter + from pdfminer.converter import PDFPageAggregator + from pdfminer.layout import LAParams, LTTextBox + from pdfminer.pdfinterp import PDFTextExtractionNotAllowed + import logging + logging.Logger.propagate = False + logging.getLogger().setLevel(logging.ERROR) + praser = PDFParser(open(pdf_path, 'rb')) + doc = PDFDocument() + praser.set_document(doc) + doc.set_parser(praser) + doc.initialize() + if not doc.is_extractable: + raise PDFTextExtractionNotAllowed + else: + rsrcmgr = PDFResourceManager() + laparams = LAParams() + device = PDFPageAggregator(rsrcmgr, laparams=laparams) + interpreter = PDFPageInterpreter(rsrcmgr, device) + content = '' + for page in doc.get_pages(): + interpreter.process_page(page) + layout = device.get_result() + for x in layout: + if isinstance(x, LTTextBox): + content = content + x.get_text().strip() + return content + +# 使用PyPDF2将PDF文件转成文本 +def pdf_to_text_with_PyPDF2_for_all_pages(pdf_path): + import guan + num_pages = guan.get_pdf_page_number(pdf_path) + content = '' + for i0 in range(num_pages): + page_text = guan.pdf_to_txt_for_a_specific_page(pdf_path, page_num=i0+1) + content += page_text + '\n\n' + return content + +# 获取PDF文件页数 +def get_pdf_page_number(pdf_path): + import PyPDF2 + pdf_file = open(pdf_path, 'rb') + pdf_reader = PyPDF2.PdfReader(pdf_file) + num_pages = len(pdf_reader.pages) + return num_pages + +# 获取PDF文件指定页面的内容 +def pdf_to_txt_for_a_specific_page(pdf_path, page_num=1): + import PyPDF2 + pdf_file = open(pdf_path, 'rb') + pdf_reader = PyPDF2.PdfReader(pdf_file) + num_pages = len(pdf_reader.pages) + for page_num0 in range(num_pages): + if page_num0 == page_num-1: + page = pdf_reader.pages[page_num0] + page_text = page.extract_text() + pdf_file.close() + return page_text + +# 获取PDF文献中的链接。例如: link_starting_form='https://doi.org' +def get_links_from_pdf(pdf_path, link_starting_form=''): + import PyPDF2 + import re + reader = PyPDF2.PdfReader(pdf_path) + pages = len(reader.pages) + i0 = 0 + links = [] + for page in range(pages): + pageSliced = reader.pages[page] + pageObject = pageSliced.get_object() + if '/Annots' in pageObject.keys(): + ann = pageObject['/Annots'] + old = '' + for a in ann: + u = a.get_object() + if '/A' in u.keys(): + if '/URI' in u['/A']: + if re.search(re.compile('^'+link_starting_form), u['/A']['/URI']): + if u['/A']['/URI'] != old: + links.append(u['/A']['/URI']) + i0 += 1 + old = u['/A']['/URI'] + return links + +# 获取当前日期字符串 +def get_date(bar=True): + import datetime + datetime_date = str(datetime.date.today()) + if bar==False: + datetime_date = datetime_date.replace('-', '') + return datetime_date + +# 获取当前时间字符串 +def get_time(colon=True): + import datetime + datetime_time = datetime.datetime.now().strftime('%H:%M:%S') + if colon==False: + datetime_time = datetime_time.replace(':', '') + return datetime_time + +# 获取本月的所有日期 +def get_date_array_of_the_current_month(str_or_datetime='str'): + import datetime + today = datetime.date.today() + first_day_of_month = today.replace(day=1) + if first_day_of_month.month == 12: + next_month = first_day_of_month.replace(year=first_day_of_month.year + 1, month=1) + else: + next_month = first_day_of_month.replace(month=first_day_of_month.month + 1) + current_date = first_day_of_month + date_array = [] + while current_date < next_month: + if str_or_datetime=='str': + date_array.append(str(current_date)) + elif str_or_datetime=='datetime': + date_array.append(current_date) + current_date += datetime.timedelta(days=1) + return date_array + +# 获取上个月份 +def get_last_month(): + import datetime + today = datetime.date.today() + last_month = today.month - 1 + if last_month == 0: + last_month = 12 + year_of_last_month = today.year - 1 + else: + year_of_last_month = today.year + return year_of_last_month, last_month + +# 获取上上个月份 +def get_the_month_before_last(): + import datetime + today = datetime.date.today() + the_month_before_last = today.month - 2 + if the_month_before_last == 0: + the_month_before_last = 12 + year_of_the_month_before_last = today.year - 1 + else: + year_of_the_month_before_last = today.year + if the_month_before_last == -1: + the_month_before_last = 11 + year_of_the_month_before_last = today.year - 1 + else: + year_of_the_month_before_last = today.year + return year_of_the_month_before_last, the_month_before_last + +# 获取上个月的所有日期 +def get_date_array_of_the_last_month(str_or_datetime='str'): + import datetime + import guan + today = datetime.date.today() + year_of_last_month, last_month = guan.get_last_month() + first_day_of_month = today.replace(year=year_of_last_month, month=last_month, day=1) + if first_day_of_month.month == 12: + next_month = first_day_of_month.replace(year=first_day_of_month.year + 1, month=1) + else: + next_month = first_day_of_month.replace(month=first_day_of_month.month + 1) + current_date = first_day_of_month + date_array = [] + while current_date < next_month: + if str_or_datetime=='str': + date_array.append(str(current_date)) + elif str_or_datetime=='datetime': + date_array.append(current_date) + current_date += datetime.timedelta(days=1) + return date_array + +# 获取上上个月的所有日期 +def get_date_array_of_the_month_before_last(str_or_datetime='str'): + import datetime + import guan + today = datetime.date.today() + year_of_last_last_month, last_last_month = guan.get_the_month_before_last() + first_day_of_month = today.replace(year=year_of_last_last_month, month=last_last_month, day=1) + if first_day_of_month.month == 12: + next_month = first_day_of_month.replace(year=first_day_of_month.year + 1, month=1) + else: + next_month = first_day_of_month.replace(month=first_day_of_month.month + 1) + current_date = first_day_of_month + date_array = [] + while current_date < next_month: + if str_or_datetime=='str': + date_array.append(str(current_date)) + elif str_or_datetime=='datetime': + date_array.append(current_date) + current_date += datetime.timedelta(days=1) + return date_array + +# 根据新的日期,填充数组中缺少的数据为零 +def fill_zero_data_for_new_dates(old_dates, new_dates, old_data_array): + new_data_array = [] + for date in new_dates: + if str(date) not in old_dates: + new_data_array.append(0) + else: + index = old_dates.index(date) + new_data_array.append(old_data_array[index]) + return new_data_array + +# 获取内存信息 +def get_memory_info(): + import psutil + memory_info = psutil.virtual_memory() + total_memory = memory_info.total/(1024**2) + used_memory = memory_info.used/(1024**2) + available_memory = memory_info.available/(1024**2) + used_memory_percent = memory_info.percent + return total_memory, used_memory, available_memory, used_memory_percent + +# 获取CPU的平均使用率 +def get_cpu_usage(interval=1): + import psutil + cpu_usage = psutil.cpu_percent(interval=interval) + return cpu_usage + +# 获取每个CPU核心的使用率,返回列表 +def get_cpu_usage_array_per_core(interval=1): + import psutil + cpu_usage_array_per_core = psutil.cpu_percent(interval=interval, percpu=True) + return cpu_usage_array_per_core + +# 获取使用率最高的CPU核心的使用率 +def get_cpu_max_usage_for_all_cores(interval=1): + import guan + cpu_usage_array_per_core = guan.get_cpu_usage_array_per_core(interval=interval) + max_cpu_usage = max(cpu_usage_array_per_core) + return max_cpu_usage + +# 获取非零使用率的CPU核心的平均使用率 +def get_cpu_averaged_usage_for_non_zero_cores(interval=1): + import guan + cpu_usage_array_per_core = guan.get_cpu_usage_array_per_core(interval=interval) + cpu_usage_array_per_core_new = guan.remove_item_in_one_array(cpu_usage_array_per_core, 0.0) + averaged_cpu_usage = sum(cpu_usage_array_per_core_new)/len(cpu_usage_array_per_core_new) + return averaged_cpu_usage + +# 在一定数量周期内得到CPU的使用率信息。默认为1秒钟收集一次,(interval+sleep_interval)*times 为收集的时间范围,范围默认为60秒,即1分钟后返回列表,总共得到60组数据。其中,数字第一列和第二列分别是平均值和最大值。 +def get_cpu_information_for_times(interval=1, sleep_interval=0, times=60): + import guan + import time + cpu_information_array = [] + for _ in range(times): + cpu_information = [] + datetime_date = guan.get_date() + datetime_time = guan.get_time() + cpu_information.append(datetime_date) + cpu_information.append(datetime_time) + cpu_usage_array_per_core = guan.get_cpu_usage_array_per_core(interval=interval) + cpu_information.append(sum(cpu_usage_array_per_core)/len(cpu_usage_array_per_core)) + cpu_information.append(max(cpu_usage_array_per_core)) + for cpu_usage in cpu_usage_array_per_core: + cpu_information.append(cpu_usage) + cpu_information_array.append(cpu_information) + time.sleep(sleep_interval) + return cpu_information_array + +# 将得到的CPU的使用率信息写入文件。默认为1分钟收集一次,(interval+sleep_interval)*times 为收集的时间范围,范围默认为60分钟,即1小时写入文件一次,总共得到60组数据。其中,数字第一列和第二列分别是平均值和最大值。 +def write_cpu_information_to_file(filename='./cpu_usage', interval=1, sleep_interval=59, times=60): + import guan + guan.make_file(filename+'.txt') + while True: + f = guan.open_file(filename) + cpu_information_array = guan.get_cpu_information_for_times(interval=interval, sleep_interval=sleep_interval, times=times) + for cpu_information in cpu_information_array: + i0 = 0 + for information in cpu_information: + if i0 < 2: + f.write(str(information)+' ') + else: + f.write(f'{information:.1f} ') + i0 += 1 + f.write('\n') + f.close() + +# 画CPU的使用率图。默认为画最近的60个数据,以及不画CPU核心的最大使用率。 +def plot_cpu_information(filename='./cpu_usage', recent_num=60, max_cpu=0): + import guan + from datetime import datetime + with open(filename+".txt", "r") as file: + lines = file.readlines() + lines = lines[::-1] + timestamps_array = [] + averaged_cpu_usage_array = [] + max_cpu_usage_array = [] + i0 = 0 + for line in lines: + i0 += 1 + if i0 >= recent_num: + break + cpu_information = line.strip() + information = cpu_information.split() + time_str = information[0]+' '+information[1] + time_format = "%Y-%m-%d %H:%M:%S" + timestamps_array.append(datetime.strptime(time_str, time_format)) + averaged_cpu_usage_array.append(float(information[2])) + max_cpu_usage_array.append(float(information[3])) + plt, fig, ax = guan.import_plt_and_start_fig_ax(adjust_bottom=0.3, adjust_left=0.15, labelsize=16, fontfamily='Times New Roman') + plt.xticks(rotation=90) + guan.plot_without_starting_fig_ax(plt, fig, ax, timestamps_array, averaged_cpu_usage_array, style='o-') + legend_array = ['Averaged'] + if max_cpu == 1: + guan.plot_without_starting_fig_ax(plt, fig, ax, timestamps_array, max_cpu_usage_array, style='o-') + legend_array.append('Max') + guan.plot_without_starting_fig_ax(plt, fig, ax, [], [], xlabel='Time', ylabel='CPU usage', fontsize=20) + plt.legend(legend_array) + plt.show() + +# 画详细的CPU的使用率图,分CPU核心画图。 +def plot_detailed_cpu_information(filename='./cpu_usage', recent_num=60): + import guan + from datetime import datetime + with open(filename+".txt", "r") as file: + lines = file.readlines() + lines = lines[::-1] + timestamps_array = [] + i0 = 0 + core_num = len(lines[0].strip().split())-4 + detailed_cpu_usage_array = [] + for line in lines: + i0 += 1 + if i0 > recent_num: + break + cpu_information = line.strip() + information = cpu_information.split() + time_str = information[0]+' '+information[1] + time_format = "%Y-%m-%d %H:%M:%S" + timestamps_array.append(datetime.strptime(time_str, time_format)) + detailed_cpu_usage = [] + for core in range(core_num): + detailed_cpu_usage.append(float(information[4+core])) + detailed_cpu_usage_array.append(detailed_cpu_usage) + for core in range(core_num): + plt, fig, ax = guan.import_plt_and_start_fig_ax(adjust_bottom=0.3, adjust_left=0.15, labelsize=16, fontfamily='Times New Roman') + plt.xticks(rotation=90) + guan.plot_without_starting_fig_ax(plt, fig, ax, timestamps_array, [row[core] for row in detailed_cpu_usage_array], style='o-') + legend_array = [] + legend_array.append(f'CPU {core+1}') + guan.plot_without_starting_fig_ax(plt, fig, ax, [], [], xlabel='Time', ylabel='CPU usage', fontsize=20) + plt.legend(legend_array) + plt.show() + +# 获取MAC地址 +def get_mac_address(): + import uuid + mac_address = uuid.UUID(int=uuid.getnode()).hex[-12:].upper() + mac_address = '-'.join([mac_address[i:i+2] for i in range(0, 11, 2)]) + return mac_address + +# 获取软件包中的所有模块名 +def get_all_modules_in_one_package(package_name='guan'): + import pkgutil + package = __import__(package_name) + module_names = [name for _, name, _ in pkgutil.iter_modules(package.__path__)] + return module_names + +# 获取软件包中一个模块的所有函数名 +def get_all_functions_in_one_module(module_name, package_name='guan'): + import inspect + function_names = [] + module = __import__(f"{package_name}.{module_name}", fromlist=[""]) + for name, obj in inspect.getmembers(module): + if inspect.isfunction(obj): + function_names.append(name) + return function_names + +# 获取软件包中的所有函数名 +def get_all_functions_in_one_package(package_name='guan', print_show=1): + import guan + module_names = guan.get_all_modules_in_one_package(package_name=package_name) + all_function_names = [] + for module_name in module_names: + function_names = guan.get_all_functions_in_one_module(module_name, package_name='guan') + if print_show == 1: + print('Module:', module_name) + for name in function_names: + all_function_names.append(name) + if print_show == 1: + print('function:', name) + if print_show == 1: + print() + return all_function_names + +# 获取调用本函数的函数名 +def get_calling_function_name(layer=1): + import inspect + caller = inspect.stack()[layer] + calling_function_name = caller.function + return calling_function_name + +# 统计Python文件中import的数量并排序 +def count_number_of_import_statements(filename, file_format='.py', num=1000): + with open(filename+file_format, 'r') as file: + lines = file.readlines() + import_array = [] + for line in lines: + if 'import ' in line: + line = line.strip() + import_array.append(line) + from collections import Counter + import_statement_counter = Counter(import_array).most_common(num) + return import_statement_counter + +# 获取软件包的本机版本 +def get_current_version(package_name='guan'): + import importlib.metadata + try: + current_version = importlib.metadata.version(package_name) + return current_version + except: + return None + +# 获取Python软件包的最新版本 +def get_latest_version(package_name='guan', timeout=5): + import requests + url = f"https://pypi.org/pypi/{package_name}/json" + try: + response = requests.get(url, timeout=timeout) + except: + return None + if response.status_code == 200: + data = response.json() + latest_version = data["info"]["version"] + return latest_version + else: + return None + +# 获取包含某个字符的进程PID值 +def get_PID_array(name): + import subprocess + command = "ps -ef | grep "+name + result = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + if result.returncode == 0: + ps_ef = result.stdout + import re + ps_ef_1 = re.split(r'\n', ps_ef) + id_running_array = [] + for ps_ef_item in ps_ef_1: + if ps_ef_item != '': + ps_ef_2 = re.split(r'\s+', ps_ef_item) + id_running_array.append(ps_ef_2[1]) + return id_running_array + +# 寻找所有的git仓库 +def find_git_repositories(base_path='./', ignored_directory_with_words=[]): + import os + git_repository_array = [] + for root, dirs, files in os.walk(base_path): + if '.git' in dirs: + ignore_signal = 0 + for word in ignored_directory_with_words: + if word in root: + ignore_signal = 1 + break + if ignore_signal == 0: + git_repository_array.append(root) + return git_repository_array + +# 在git仓库列表中找到有修改待commit的 +def get_git_repositories_to_commit(git_repository_array): + import os + import subprocess + git_repository_array_to_commit = [] + for repository in git_repository_array: + os.chdir(repository) + status = subprocess.check_output(['git', 'status']).decode('utf-8') + if 'nothing to commit, working tree clean' in status: + pass + else: + git_repository_array_to_commit.append(repository) + return git_repository_array_to_commit + +# 每日git commit次数的统计 +def statistics_of_git_commits(print_show=0, str_or_datetime='str'): + import subprocess + import collections + since_date = '100 year ago' + result = subprocess.run( + ['git', 'log', f'--since={since_date}', '--pretty=format:%ad', '--date=short'], + stdout=subprocess.PIPE, + text=True) + commits = result.stdout.strip().split('\n') + counter = collections.Counter(commits) + daily_commit_counts = dict(sorted(counter.items())) + date_array = [] + commit_count_array = [] + for date, count in daily_commit_counts.items(): + if print_show == 1: + print(f"{date}: {count} commits") + if str_or_datetime=='datetime': + import datetime + date_array.append(datetime.datetime.strptime(date, "%Y-%m-%d")) + elif str_or_datetime=='str': + date_array.append(date) + commit_count_array.append(count) + return date_array, commit_count_array + +# 将文件目录结构写入Markdown文件 +def write_file_list_in_markdown(directory='./', filename='a', reverse_positive_or_negative=1, starting_from_h1=None, banned_file_format=[], hide_file_format=None, divided_line=None, show_second_number=None, show_third_number=None): + import os + f = open(filename+'.md', 'w', encoding="utf-8") + filenames1 = os.listdir(directory) + u0 = 0 + for filename1 in filenames1[::reverse_positive_or_negative]: + filename1_with_path = os.path.join(directory,filename1) + if os.path.isfile(filename1_with_path): + if os.path.splitext(filename1)[1] not in banned_file_format: + if hide_file_format == None: + f.write('+ '+str(filename1)+'\n\n') + else: + f.write('+ '+str(os.path.splitext(filename1)[0])+'\n\n') + else: + u0 += 1 + if divided_line != None and u0 != 1: + f.write('--------\n\n') + if starting_from_h1 == None: + f.write('#') + f.write('# '+str(filename1)+'\n\n') + + filenames2 = os.listdir(filename1_with_path) + i0 = 0 + for filename2 in filenames2[::reverse_positive_or_negative]: + filename2_with_path = os.path.join(directory, filename1, filename2) + if os.path.isfile(filename2_with_path): + if os.path.splitext(filename2)[1] not in banned_file_format: + if hide_file_format == None: + f.write('+ '+str(filename2)+'\n\n') + else: + f.write('+ '+str(os.path.splitext(filename2)[0])+'\n\n') + else: + i0 += 1 + if starting_from_h1 == None: + f.write('#') + if show_second_number != None: + f.write('## '+str(i0)+'. '+str(filename2)+'\n\n') + else: + f.write('## '+str(filename2)+'\n\n') + + j0 = 0 + filenames3 = os.listdir(filename2_with_path) + for filename3 in filenames3[::reverse_positive_or_negative]: + filename3_with_path = os.path.join(directory, filename1, filename2, filename3) + if os.path.isfile(filename3_with_path): + if os.path.splitext(filename3)[1] not in banned_file_format: + if hide_file_format == None: + f.write('+ '+str(filename3)+'\n\n') + else: + f.write('+ '+str(os.path.splitext(filename3)[0])+'\n\n') + else: + j0 += 1 + if starting_from_h1 == None: + f.write('#') + if show_third_number != None: + f.write('### ('+str(j0)+') '+str(filename3)+'\n\n') + else: + f.write('### '+str(filename3)+'\n\n') + + filenames4 = os.listdir(filename3_with_path) + for filename4 in filenames4[::reverse_positive_or_negative]: + filename4_with_path = os.path.join(directory, filename1, filename2, filename3, filename4) + if os.path.isfile(filename4_with_path): + if os.path.splitext(filename4)[1] not in banned_file_format: + if hide_file_format == None: + f.write('+ '+str(filename4)+'\n\n') + else: + f.write('+ '+str(os.path.splitext(filename4)[0])+'\n\n') + else: + if starting_from_h1 == None: + f.write('#') + f.write('#### '+str(filename4)+'\n\n') + + filenames5 = os.listdir(filename4_with_path) + for filename5 in filenames5[::reverse_positive_or_negative]: + filename5_with_path = os.path.join(directory, filename1, filename2, filename3, filename4, filename5) + if os.path.isfile(filename5_with_path): + if os.path.splitext(filename5)[1] not in banned_file_format: + if hide_file_format == None: + f.write('+ '+str(filename5)+'\n\n') + else: + f.write('+ '+str(os.path.splitext(filename5)[0])+'\n\n') + else: + if starting_from_h1 == None: + f.write('#') + f.write('##### '+str(filename5)+'\n\n') + + filenames6 = os.listdir(filename5_with_path) + for filename6 in filenames6[::reverse_positive_or_negative]: + filename6_with_path = os.path.join(directory, filename1, filename2, filename3, filename4, filename5, filename6) + if os.path.isfile(filename6_with_path): + if os.path.splitext(filename6)[1] not in banned_file_format: + if hide_file_format == None: + f.write('+ '+str(filename6)+'\n\n') + else: + f.write('+ '+str(os.path.splitext(filename6)[0])+'\n\n') + else: + if starting_from_h1 == None: + f.write('#') + f.write('###### '+str(filename6)+'\n\n') + f.close() + +# 从网页的标签中获取内容 +def get_html_from_tags(link, tags=['title', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'li', 'a']): + from bs4 import BeautifulSoup + import urllib.request + import ssl + ssl._create_default_https_context = ssl._create_unverified_context + html = urllib.request.urlopen(link).read().decode('utf-8') + soup = BeautifulSoup(html, features="lxml") + all_tags = soup.find_all(tags) + content = '' + for tag in all_tags: + text = tag.get_text().replace('\n', '') + if content == '': + content = text + else: + content = content + '\n\n' + text + return content + +# 从HTML中获取所有的链接 +def get_links_from_html(html_link, links_with_text=0): + from bs4 import BeautifulSoup + import urllib.request + import ssl + ssl._create_default_https_context = ssl._create_unverified_context + html = urllib.request.urlopen(html_link).read().decode('utf-8') + soup = BeautifulSoup(html, features="lxml") + a_tags = soup.find_all('a') + if links_with_text == 0: + link_array = [tag.get('href') for tag in a_tags if tag.get('href')] + return link_array + else: + link_array_with_text = [(tag.get('href'), tag.text) for tag in a_tags if tag.get('href')] + return link_array_with_text + +# 检查链接的有效性 +def check_link(url, timeout=3, allow_redirects=True): + import requests + try: + response = requests.head(url, timeout=timeout, allow_redirects=allow_redirects) + if response.status_code == 200: + return True + else: + return False + except requests.exceptions.RequestException: + return False + +# 检查链接数组中链接的有效性 +def check_link_array(link_array, timeout=3, allow_redirects=True, try_again=0, print_show=1): + import guan + failed_link_array0 = [] + for link in link_array: + if link=='#' or guan.check_link(link, timeout=timeout, allow_redirects=allow_redirects): + pass + else: + failed_link_array0.append(link) + if print_show: + print(link) + failed_link_array = [] + if try_again: + if print_show: + print('\nTry again:\n') + for link in failed_link_array0: + if link=='#' or guan.check_link(link, timeout=timeout, allow_redirects=allow_redirects): + pass + else: + failed_link_array.append(link) + if print_show: + print(link) + else: + failed_link_array = failed_link_array0 + return failed_link_array + +# 生成二维码 +def creat_qrcode(data="https://www.guanjihuan.com", filename='a', file_format='.png'): + import qrcode + img = qrcode.make(data) + img.save(filename+file_format) + +# 通过Sci-Hub网站下载文献 +def download_with_scihub(address=None, num=1): + from bs4 import BeautifulSoup + import re + import requests + import os + if num==1 and address!=None: + address_array = [address] + else: + address_array = [] + for i in range(num): + address = input('\nInput:') + address_array.append(address) + for address in address_array: + r = requests.post('https://sci-hub.st/', data={'request': address}) + print('\nResponse:', r) + print('Address:', r.url) + soup = BeautifulSoup(r.text, features='lxml') + pdf_URL = soup.embed['src'] + # pdf_URL = soup.iframe['src'] # This is a code line of history version which fails to get pdf URL. + if re.search(re.compile('^https:'), pdf_URL): + pass + else: + pdf_URL = 'https:'+pdf_URL + print('PDF address:', pdf_URL) + name = re.search(re.compile('fdp.*?/'),pdf_URL[::-1]).group()[::-1][1::] + print('PDF name:', name) + print('Directory:', os.getcwd()) + print('\nDownloading...') + r = requests.get(pdf_URL, stream=True) + with open(name, 'wb') as f: + for chunk in r.iter_content(chunk_size=32): + f.write(chunk) + print('Completed!\n') + if num != 1: + print('All completed!\n') + +# 将字符串转成音频 +def str_to_audio(str='hello world', filename='str', rate=125, voice=1, read=1, save=0, compress=0, bitrate='16k', print_text=0): + import pyttsx3 + import guan + if print_text==1: + print(str) + engine = pyttsx3.init() + voices = engine.getProperty('voices') + engine.setProperty('voice', voices[voice].id) + engine.setProperty("rate", rate) + if save==1: + engine.save_to_file(str, filename+'.wav') + engine.runAndWait() + print('Wav file saved!') + if compress==1: + import os + os.rename(filename+'.wav', 'temp.wav') + guan.compress_wav_to_mp3('temp.wav', output_filename=filename+'.mp3', bitrate=bitrate) + os.remove('temp.wav') + if read==1: + engine.say(str) + engine.runAndWait() + +# 将txt文件转成音频 +def txt_to_audio(txt_path, rate=125, voice=1, read=1, save=0, compress=0, bitrate='16k', print_text=0): + import pyttsx3 + import guan + f = open(txt_path, 'r', encoding ='utf-8') + text = f.read() + if print_text==1: + print(text) + engine = pyttsx3.init() + voices = engine.getProperty('voices') + engine.setProperty('voice', voices[voice].id) + engine.setProperty("rate", rate) + if save==1: + import re + filename = re.split('[/,\\\]', txt_path)[-1][:-4] + engine.save_to_file(text, filename+'.wav') + engine.runAndWait() + print('Wav file saved!') + if compress==1: + import os + os.rename(filename+'.wav', 'temp.wav') + guan.compress_wav_to_mp3('temp.wav', output_filename=filename+'.mp3', bitrate=bitrate) + os.remove('temp.wav') + if read==1: + engine.say(text) + engine.runAndWait() + +# 将PDF文件转成音频 +def pdf_to_audio(pdf_path, rate=125, voice=1, read=1, save=0, compress=0, bitrate='16k', print_text=0): + import pyttsx3 + import guan + text = guan.pdf_to_text(pdf_path) + text = text.replace('\n', ' ') + if print_text==1: + print(text) + engine = pyttsx3.init() + voices = engine.getProperty('voices') + engine.setProperty('voice', voices[voice].id) + engine.setProperty("rate", rate) + if save==1: + import re + filename = re.split('[/,\\\]', pdf_path)[-1][:-4] + engine.save_to_file(text, filename+'.wav') + engine.runAndWait() + print('Wav file saved!') + if compress==1: + import os + os.rename(filename+'.wav', 'temp.wav') + guan.compress_wav_to_mp3('temp.wav', output_filename=filename+'.mp3', bitrate=bitrate) + os.remove('temp.wav') + if read==1: + engine.say(text) + engine.runAndWait() + +# 将wav音频文件压缩成MP3音频文件 +def compress_wav_to_mp3(wav_path, output_filename='a.mp3', bitrate='16k'): + # Note: Beside the installation of pydub, you may also need download FFmpeg on http://www.ffmpeg.org/download.html and add the bin path to the environment variable. + from pydub import AudioSegment + sound = AudioSegment.from_mp3(wav_path) + sound.export(output_filename,format="mp3",bitrate=bitrate) + +# 将WordPress导出的XML格式文件转换成多个MarkDown格式的文件 +def convert_wordpress_xml_to_markdown(xml_file='./a.xml', convert_content=1, replace_more=[]): + import xml.etree.ElementTree as ET + import re + tree = ET.parse(xml_file) + root = tree.getroot() + for item in root.findall('.//item'): + title = item.find('title').text + content = item.find('.//content:encoded', namespaces={'content': 'http://purl.org/rss/1.0/modules/content/'}).text + if convert_content == 1: + try: + content = re.sub(r'', '', content) + content = content.replace('

    ', '') + content = content.replace('

    ', '') + content = content.replace('
      ', '') + content = content.replace('
    ', '') + content = content.replace('
      ', '') + content = content.replace('
    ', '') + content = content.replace('', '') + content = content.replace('', '') + content = content.replace('
  • ', '') + content = content.replace('
  • ', '+ ') + content = content.replace('', '') + content = re.sub(r'', '## ', content) + content = re.sub(r'', '### ', content) + content = re.sub(r'', '#### ', content) + for replace_item in replace_more: + content = content.replace(replace_item, '') + for _ in range(100): + content = content.replace('\n\n\n', '\n\n') + except: + print(f'提示:字符串替换出现问题!出现问题的内容为:{content}') + else: + pass + markdown_content = f"# {title}\n{content}" + markdown_file_path = f"{title}.md" + cleaned_filename = re.sub(r'[/:*?"<>|\'\\]', ' ', markdown_file_path) + with open(cleaned_filename, 'w', encoding='utf-8') as md_file: + md_file.write(markdown_content) + +# 凯利公式 +def kelly_formula(p, b, a=1): + f=(p/a)-((1-p)/b) + return f + +# 获取所有股票 +def all_stocks(): + import numpy as np + import akshare as ak + stocks = ak.stock_zh_a_spot_em() + title = np.array(stocks.columns) + stock_data = stocks.values + return title, stock_data + +# 获取所有股票的代码 +def all_stock_symbols(): + import guan + title, stock_data = guan.all_stocks() + stock_symbols = stock_data[:, 1] + return stock_symbols + +# 股票代码的分类 +def stock_symbols_classification(): + import guan + import re + stock_symbols = guan.all_stock_symbols() + # 上交所主板 + stock_symbols_60 = [] + for stock_symbol in stock_symbols: + find_600 = re.findall(r'^600', stock_symbol) + find_601 = re.findall(r'^601', stock_symbol) + find_603 = re.findall(r'^603', stock_symbol) + find_605 = re.findall(r'^605', stock_symbol) + if find_600 != [] or find_601 != [] or find_603 != [] or find_605 != []: + stock_symbols_60.append(stock_symbol) + # 深交所主板 + stock_symbols_00 = [] + for stock_symbol in stock_symbols: + find_000 = re.findall(r'^000', stock_symbol) + find_001 = re.findall(r'^001', stock_symbol) + find_002 = re.findall(r'^002', stock_symbol) + find_003 = re.findall(r'^003', stock_symbol) + if find_000 != [] or find_001 != [] or find_002 != [] or find_003 != []: + stock_symbols_00.append(stock_symbol) + # 创业板 + stock_symbols_30 = [] + for stock_symbol in stock_symbols: + find_300 = re.findall(r'^300', stock_symbol) + find_301 = re.findall(r'^301', stock_symbol) + if find_300 != [] or find_301 != []: + stock_symbols_30.append(stock_symbol) + # 科创板 + stock_symbols_68 = [] + for stock_symbol in stock_symbols: + find_688 = re.findall(r'^688', stock_symbol) + find_689 = re.findall(r'^689', stock_symbol) + if find_688 != [] or find_689 != []: + stock_symbols_68.append(stock_symbol) + # 新三板 + stock_symbols_8_4 = [] + for stock_symbol in stock_symbols: + find_82 = re.findall(r'^82', stock_symbol) + find_83 = re.findall(r'^83', stock_symbol) + find_87 = re.findall(r'^87', stock_symbol) + find_88 = re.findall(r'^88', stock_symbol) + find_430 = re.findall(r'^430', stock_symbol) + find_420 = re.findall(r'^420', stock_symbol) + find_400 = re.findall(r'^400', stock_symbol) + if find_82 != [] or find_83 != [] or find_87 != [] or find_88 != [] or find_430 != [] or find_420 != [] or find_400 != []: + stock_symbols_8_4.append(stock_symbol) + # 检查遗漏的股票代码 + stock_symbols_others = [] + for stock_symbol in stock_symbols: + if stock_symbol not in stock_symbols_60 and stock_symbol not in stock_symbols_00 and stock_symbol not in stock_symbols_30 and stock_symbol not in stock_symbols_68 and stock_symbol not in stock_symbols_8_4: + stock_symbols_others.others.append(stock_symbol) + return stock_symbols_60, stock_symbols_00, stock_symbols_30, stock_symbols_68, stock_symbols_8_4, stock_symbols_others + +# 股票代码各个分类的数量 +def statistics_of_stock_symbols_classification(): + import guan + stock_symbols_60, stock_symbols_00, stock_symbols_30, stock_symbols_68, stock_symbols_8_4, stock_symbols_others = guan.stock_symbols_classification() + num_stocks_60 = len(stock_symbols_60) + num_stocks_00 = len(stock_symbols_00) + num_stocks_30 = len(stock_symbols_30) + num_stocks_68 = len(stock_symbols_68) + num_stocks_8_4 = len(stock_symbols_8_4) + num_stocks_others= len(stock_symbols_others) + return num_stocks_60, num_stocks_00, num_stocks_30, num_stocks_68, num_stocks_8_4, num_stocks_others + +# 从股票代码获取股票名称 +def find_stock_name_from_symbol(symbol='000002'): + import guan + title, stock_data = guan.all_stocks() + for stock in stock_data: + if symbol in stock: + stock_name = stock[2] + return stock_name + +# 市值排序 +def sorted_market_capitalization(num=10): + import numpy as np + import guan + title, stock_data = guan.all_stocks() + new_stock_data = [] + for stock in stock_data: + if np.isnan(float(stock[9])): + continue + else: + new_stock_data.append(stock) + new_stock_data = np.array(new_stock_data) + list_index = np.argsort(new_stock_data[:, 17]) + list_index = list_index[::-1] + if num == None: + num = len(list_index) + sorted_array = [] + for i0 in range(num): + stock_symbol = new_stock_data[list_index[i0], 1] + stock_name = new_stock_data[list_index[i0], 2] + market_capitalization = new_stock_data[list_index[i0], 17]/1e8 + sorted_array.append([i0+1, stock_symbol, stock_name, market_capitalization]) + return sorted_array + +# 美股市值排序 +def sorted_market_capitalization_us(num=10): + import akshare as ak + import numpy as np + stocks = ak.stock_us_spot_em() + stock_data = stocks.values + new_stock_data = [] + for stock in stock_data: + if np.isnan(float(stock[9])): + continue + else: + new_stock_data.append(stock) + new_stock_data = np.array(new_stock_data) + list_index = np.argsort(new_stock_data[:, 9]) + list_index = list_index[::-1] + if num == None: + num = len(list_index) + sorted_array = [] + for i0 in range(num): + stock_symbol = new_stock_data[list_index[i0], 15] + stock_name = new_stock_data[list_index[i0], 1] + market_capitalization = new_stock_data[list_index[i0], 9]/1e8 + sorted_array.append([i0+1, stock_symbol, stock_name, market_capitalization]) + return sorted_array + +# 获取单个股票的历史数据 +def history_data_of_one_stock(symbol='000002', period='daily', start_date="19000101", end_date='21000101'): + # period = 'daily' + # period = 'weekly' + # period = 'monthly' + import numpy as np + import akshare as ak + stock = ak.stock_zh_a_hist(symbol=symbol, period=period, start_date=start_date, end_date=end_date) + title = np.array(stock.columns) + stock_data = stock.values[::-1] + return title, stock_data + +# 绘制股票图 +def plot_stock_line(date_array, opening_array, closing_array, high_array, low_array, lw_open_close=6, lw_high_low=2, xlabel='date', ylabel='price', title='', fontsize=20, labelsize=20, adjust_bottom=0.2, adjust_left=0.2, fontfamily='Times New Roman'): + import guan + plt, fig, ax = guan.import_plt_and_start_fig_ax(adjust_bottom=adjust_bottom, adjust_left=adjust_left, labelsize=labelsize, fontfamily=fontfamily) + if fontfamily=='Times New Roman': + ax.set_title(title, fontsize=fontsize, fontfamily='Times New Roman') + ax.set_xlabel(xlabel, fontsize=fontsize, fontfamily='Times New Roman') + ax.set_ylabel(ylabel, fontsize=fontsize, fontfamily='Times New Roman') + else: + ax.set_title(title, fontsize=fontsize) + ax.set_xlabel(xlabel, fontsize=fontsize) + ax.set_ylabel(ylabel, fontsize=fontsize) + for i0 in range(len(date_array)): + if opening_array[i0] <= closing_array[i0]: + ax.vlines(date_array[i0], opening_array[i0], closing_array[i0], linestyle='-', color='red', lw=lw_open_close) + ax.vlines(date_array[i0], low_array[i0], high_array[i0], color='red', linestyle='-', lw=lw_high_low) + else: + ax.vlines(date_array[i0], opening_array[i0], closing_array[i0], linestyle='-', color='green', lw=lw_open_close) + ax.vlines(date_array[i0], low_array[i0], high_array[i0], color='green', linestyle='-', lw=lw_high_low) + plt.show() + plt.close('all') + +# Guan软件包的使用统计(仅仅统计装机数和import次数) +def statistics_of_guan_package(function_name=None): + import guan + try: + import socket + datetime_date = guan.get_date() + datetime_time = guan.get_time() + current_version = guan.get_current_version('guan') + client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + client_socket.settimeout(0.5) + client_socket.connect(('socket.guanjihuan.com', 12345)) + mac_address = guan.get_mac_address() + if function_name == None: + message = { + 'server': 'py.guanjihuan.com', + 'date': datetime_date, + 'time': datetime_time, + 'version': current_version, + 'MAC_address': mac_address, + } + else: + message = { + 'server': 'py.guanjihuan.com', + 'date': datetime_date, + 'time': datetime_time, + 'version': current_version, + 'MAC_address': mac_address, + 'function_name': function_name + } + import json + send_message = json.dumps(message) + client_socket.send(send_message.encode()) + client_socket.close() + except: + pass + +# Guan软件包升级检查和提示(如果无法连接或者版本为最新,那么均没有提示) +def notification_of_upgrade(timeout=5): + try: + import guan + latest_version = guan.get_latest_version(package_name='guan', timeout=timeout) + current_version = guan.get_current_version('guan') + if latest_version != None and current_version != None: + if latest_version != current_version: + print('升级提示:您当前使用的版本是 guan-'+current_version+',目前已经有最新版本 guan-'+latest_version+'。您可以通过以下命令对软件包进行升级:pip install --upgrade guan -i https://pypi.python.org/simple 或 pip install --upgrade guan') + except: + pass \ No newline at end of file diff --git a/README.md b/README.md index 996b3b5..4eb9b2b 100755 --- a/README.md +++ b/README.md @@ -26,9 +26,10 @@ import guan + file reading and writing + figure plotting + data processing ++ decorators ++ others + custom classes + functions using objects of custom classes -+ decorators ## About this package