- deletepycache
- doc
- some posts about python
- command line argument parse
- progess bar
- requirements.txt
- tar file
- zip file
- subprocess
- sh
- format
- listdir
- chunk
- atexit
- OrderedDict
- logging
- list 间隔差
- 统计数据分布
- 随机采样
- pprint
- frozenset
- dict
- json
- textwrap.dedent
- bytes 转 str 编码
- remove suffix
- 使用 Requests 传递 formdata
- 统计代码运行时间
- short unique random string
- csv
- change dir
- explicitly ignore error
- logging support
__pycache__
delete$ find . -name '__pycache__' -type d -exec rm -rf {} \;
doc
当我们日常开发时,一些不太常用的包或者想要确认某个方法的具体的说明时,可以通过命令行的方式去查看。
$ python -m pydoc sys
上述例子中的sys
可以替换成任意你期望的包名, 或者该包名下的具体的方法。
$ python -m pydoc rich.theme
除此之外,我们还可以直接启动一个本地 server 的方式去更友好的查看文档。
# 在 localhost:1234 上启动一个文档服务
$ python -m pydoc -p 1234
some posts about python
*
的使用command line argument parse
使用 [click] 作为解析库,可以方便的实现子命令操作。整理一些使用中遇到的问题。
- 对 argument 进行注释
使用多行注释的方式""" xxx """
。但是 click 默认是现在在统一行,去除掉了换行的操作。所以在有多参数的情况下显示比较乱。解决方式是:
@click.command()
@click.argument('gt', type=click.Path(exists=True))
@click.argument('prefix', type=click.Path(exists=True))
def execute(gt, prefix):
"""
\b
explain the command usage
gt: xxxx
prefix: xxxxx
"""
progess bar
使用 [tqdm] 搭配各种场景使用,比如在 requests 中显示现在的进度时,可以有如下的方式:
r = requests.get(url, stream=True, allow_redirects=True)
... # status code check
path = pathlib.Path(filename).expanduser().resolve()
path.parent.mkdir(parents=True, exist_ok=True)
desc = filename.ljust(22, ' ') # 22 is the lenght bigger than filename, should change
r.raw.read = functools.partial(r.raw.read, decode_content=True) # Decompress if needed
with tqdm.tqdm.wrapattr(r.raw, "read", total=file_size, desc=desc) as r_raw:
with path.open("wb") as f:
shutil.copyfileobj(r_raw, f)
同时也可以在命令行中使用:
find . -name '*.py' -type f -exec cat \{} \; \
| tqdm --unit loc --unit_scale --total 857366 >> /dev/null
100%|█████████████████████████████████| 857K/857K [00:04<00:00, 246Kloc/s]
更多使用参考文档 [tqdm documention] 。
requirements.txt
这个文件可以用于python项目初始化时安装依赖使用。可以通过两种方式获取到:
# 获取完整的依赖环境
$ pip3 freeze > requirements.txt
# 获取必要的依赖
$ pip install pipreqs
$ pipreqs .
# 使用
$ pip install -r requirements.txt
tar file
获取 tar.gz 文件中的顶层目录的名称,使用下面简单的方式:
archive = tarfile.open(filepath, mode='r')
print os.path.commonprefix(archive.getnames())
zip file
使用 zipfile 库,更加灵活的打包 zip 包。
from zipfile import ZipFile
with ZipFile('target.zip', 'w') as newzip:
newzip.write('directory_name')
newzip.write('file_name')
使用 shutil 中的 make_archive 函数生成的包有点奇怪(?),不如使用 zipfile 来的灵活。
subprocess
使用 subprocess 时,往往需要添加子进程中的环境变量,可以使用:
import subprocess, os
my_env = os.environ.copy()
my_env["PATH"] = "/usr/sbin:/sbin:" + my_env["PATH"]
subprocess.Popen(my_command, env=my_env)
sh
sh is a full-fledged subprocess replacement for Python 2.6 - 3.8, PyPy and PyPy3 that allows you to call any program as if it were a function.
import sh
# like command run in bash. $ sed -i 's/a/A/g' filename
sh.sed(['-i', f's/a/A/g', filename]}'])
format
python 中用于格式的操作,在格式化数字的时候,可以方便的控制小数点后的面位数。具体的可以参考[string format]
"{:.2f}".format(13.949999999999999)
listdir
python 中遍历目录有好几种方式,不同的方式满足于不同的场景。
- os.listdir
列举出当前目录下所有的文件,同时我们可以通过文件类型去进行过滤。比如:
import os
files = [f for f in os.listdir('.') if os.path.isfile(f)]
dirs = [d for d in os.listdir('.') if os.path.isdir(d)]
- os.walk
walk 一般会递归去获取当前目录下所有的文件包含子目录,需要通过指定一些配置来满足我们的需求。简单的使用如下:
import os
for root, dirs, files in os.walk('.', topdown=True):
dirs.clear()
for file in files:
print(file)
dirs.clear()
用于不递归遍历当前目录下的子目录。 如果不删除,则会显示当前目录下所有的文件包含子目录里面的文件。这需要结合具体的场景。
- find(shell command with subprocess)
这种方式是结合了 shell 的一些特性,一般不得以的情况下才使用。
chunk
def chunk(iterable, n):
d = {}
for i, x in enumerate(iterable):
d.setdefault(i//n, []).append(x)
return list(d.values())
或者使用more_itertools
这个库, 可以更加简单的实现。
import more_itertools as mit
list(mit.chunked(iterable, n))
补充:
使用列表生成器也是可以完成相应的操作:
[lst[i:i + n] for i in range(0, len(lst), n)]
atexit
atexit 在注册时传递参数:
def goodbye(name, adjective):
print('Goodbye %s, it was %s to meet you.' % (name, adjective))
import atexit
atexit.register(goodbye, adjective='nice', name='Donny')
OrderedDict
将字典 key 转换成 list,可以按照如下的方式:
>>> from collections import OrderedDict
>>> a = OrderedDict({'a': 1, 'b':2})
>>> a.keys()
odict_keys(['a', 'b'])
>>> b = [*a]
>>> b
['a', 'b']
>>>
单纯的 keys 是odict_keys
类型,而不是list
类型。所以通过解引用直接获取生成 key 的 list 。
logging
在使用 logging 时,我们在自定义输出格式时,使用如下的方式:
logging.basicConfig(format='%(asctime)s,%(msecs)03d %(levelname)-8s [%(pathname)s:%(lineno)d in function %(funcName)s] %(message)s', datefmt='%Y-%m-%d:%H:%M:%S', level=logging.DEBUG)
注意这边的%(msecs)03d
这个是用来保留 3 位毫秒数的,方便后续的处理。
list 间隔差
t = [1,2,4,6]
v = [t[i+1]-t[i] for i in range(len(t)-1)]
# [1,2,2]
统计数据分布
from itertools import groupby
def histogram(data, step):
for k, g in groupby(sorted(data), key=lambda x: x//step):
print('{}-{}: {}'.format(k*step, (k+1)*step-1, len(list(g))))
随机采样
import random
import glob
pbfiles = [f for f in glob.glob("trt_data_pb/*.pb")]
random_pb = random.sample(pbfiles, 1000)
print(random_pb[:10])
采用random.sample
这个可以在一组 list 中,随机获取指定数量的内容。 如果只是想选一个,可以使用random.choice
pprint
正常使用 print 的打印效果一般,不够 pretty, 所以我们可以使用 pprint 库来完成更好的打印,正常情况下我们会使用到一些基本的参数来控制打印的效果。同时可以通过自定义统一的打印风格。
>>> from pprint import PrettyPrinter
>>> custom_printer = PrettyPrinter(
... indent=4,
... width=100,
... depth=2,
... compact=True,
... sort_dicts=False
... )
...
>>> user={"name": "jesse", "age": 1, "address": {"street": "xxx", "city": "xxxxx"}}
>>> custom_printer.pprint(user)
{'name': 'jesse', 'age': 1, 'address': {'street': 'xxx', 'city': 'xxxxx'}}
实际上我们可以通过单独设置的方式来打印这些内容,比如:
>>> from pprint import pprint
>>> pprint(users, indent=4, depth=2)
frozenset
众所周知,dict 是一个 mutable 的结构,所以我们无法像 set 一样对两个 dict 进行一些对比操作。还在 Python 提供了一个frozenset
的内置函数可以用来将 dict 转换成一个 set 。这样我们便可以使用 set 的特性来进行 dict 的对比操作。
>>> dict_a = {"Hello": "world"}
>>> dict_b = {"hello": "world"}
>>> frozenset(dict_a) == frozenset(dict_b)
False
>>> frozenset(dict_a) - frozenset(dict_b)
frozenset({'Hello'})
dict
正常情况下,我们对 dict 访问不存在的 key 时会报错 keyerror。 因此当我们想对不存在的 key 访问时,我们可以使用default
参数来指定一个默认的返回值。
>>> d = {"a": 1, "b": 2, "c": 3}
>>> d.get('d', 4)
4
此外,我们还可以通过defaultdict
来统一配置默认的返回值。
>>> from collections import defaultdict
>>> d = {"a": 1, "b": 2, "c": 3}
>>> d = defaultdict(lambda: None, d)
>>> d["a"]
1
>>> print(d["d"])
None
当我们需要批量获取指定的 key 的值,可以通过itemgetter
来完成,比如:
>>> from operator import itemgetter
>>> d = {"a": 1, "b": 2, "c": 3}
>>> items = ['a', 'b', 'c']
>>> a, b, c = itemgetter(*items)(d)
>>> a, b, c
(1, 2, 3)
json
在处理 json load 时,提供了两个 hook 接口,一个是object_pairs_hook
,另一个是object_hook
,这两个函数中均可对期望的字段或者内容进行相应的处理。其中object_pairs_hook
的优先级更高一些。 这两者的差别在于函数的入参。object_hook
的入参是一个 dict ,而object_pairs_hook
的入参是一个 list ,其中每一个元素都是一个 tuple ,第一个元素是 key ,第二个元素是 value 。
import json
json.loads('{"foo": "bar"}', object_pairs_hook=print)
json.loads('{"foo": "bar"}', object_hook=print)
# [('foo', 'bar')]
# {'foo': 'bar'}
textwrap.dedent
在使用多行文本编辑时,我们可能会使用到 textwrap.dedent 函数,它可以将多行文本的缩进去掉。 否则,就会按照实际填写的锁进原样输出。比如下面的例子。
import textwrap
text = """
int main(int argc, char **argv) {
print("hello world");
}
"""
print("---")
print(text)
print("---")
print(textwrap.dedent(text))
print("---")
运行后我们可以看到如下的输出:
---
int main(int argc, char **argv) {
print("hello world");
}
---
int main(int argc, char **argv) {
print("hello world");
}
---
bytes 转 str 编码
Python3 中有两种方式进行转换
encoding = 'utf-8'
b'hello'.decode(encoding)
---
str(b'hello', encoding)
remove suffix
在 python 3.9+ 可以直接使用removesuffix
等函数,在比较低的版本上,我们可以使用如下的函数进行替换。
def remove_suffix(text, suffix):
return text[:-len(suffix)] if text.endswith(suffix) and len(suffix) != 0 else text
使用 Requests 传递 formdata
需要使用到 urllib3 的相关函数。相关文档
import json
import requests
from ulrlib3 import encode_multipart_formdata
payload = {
"name": (None, "xxx"),
"args": (None, json.dumps({
"xxx":2,
"xxx": 'xxx',
"xxx": False
})),
"smiles": (None, json.dumps(['xxx','xxxx'])),
"file": ("filename", open(filename).read())
}
body, header_option = encode_multipart_formdata(payload)
headers['Content-Type'] = header_option
url = 'xxxx'
resp = requests.request("POST", url, data=body, headers=headers)
print(resp)
统计代码运行时间
- time 模块
import time
t1 = time.time()
...
t2 = time.time()
elapse = t2-t1
- timeit 模块
timer
from timeit import default_timer as timer
t1 = timer()
...
t2 = timer()
elapse = t2 - t1
命令行
$ python -m timeit '"-".join(str(n) for n in range(100))'
代码
import timeit
timeit.timeit(lambda: "-".join(map(str, range(100))), number=1000)
装饰器
from timeit import default_timer as timer
def timer_func(func):
def wrapper(*args, **kwargs):
t1 = timer()
result = func(*args, **kwargs)
t2 = timer()
print(f'{func.__name__}() executed in {(t2-t1):.6f}s')
return result
return wrapper
@timer_func
def my_func():
....
short unique random string
我们可以通过比较通用的方式来生成随机的字符串。
import random
import shortuuid
import uuid
alphabet = string.ascii_lowercase + string.digits
su = shortuuid.ShortUUID(alphabet=alphabet)
def random_choice():
return ''.join(random.choices(alphabet, k=8))
def truncated_uuid4():
return str(uuid.uuid4())[:8]
def shortuuid_random():
return su.random(length=8)
def secrets_random_choice():
return ''.join(secrets.choice(alphabet) for _ in range(8))
其中最快的是random_choice
,且无过多的依赖。
csv
当我们在开发一个通用工具时,可能会遇到不同平台或者不同工具生成的 csv 文件,这是我们需要考虑的问题,好在 csv 库提供了一些方法,让我们可以方便的去判断。
Sniff
该类提供了两个 handy 的方法,sniff
用于来测试当前文件的使用的dialect
。这个主要是给 reader 和 writer 函数使用的。
import csv
buffered = io.BufferedReader(cast(io.RawIOBase, fp), buffer_size=4096)
first_bytes = buffered.peek(2048).strip()
dialect = csv.Sniffer().sniff(
first_bytes.decode(encoding or "utf-8-sig", "ignore")
)
# then
csv.DictReader(decoded_fp, dialect=dialect)
此外,如果我们清楚是那种 dialect 时,csv 库提供了几个定义好的 dialect。包括:
- csv.excel
- csv.excel_tab
- csv.unix_dialect
也可以通过自定义的方式来定义一个新的 dialect。 使用list_dialects()
获取当前的所有的 dialect。
另外一个方法是has_header
,用来通过经验判断当前的 csv 是否包含一个 header。
import csv
has_header = csv.Sniffer().has_header(fd.read(2048))
fd.seek(0)
field_size_limit
csv 文件可能会存在非常多的列,在 csv 的库中,我们可以通过field_size_limit
来设置最大的列的信息。可以使用如下方式:
import csv
import sys
raw_size_back = csv.field_size_limit()
maxInt = sys.maxsize
while True:
try:
csv.field_size_limit(maxInt)
break
except OverflowError:
maxInt = int(maxInt/10)
change dir
我们在 python 中要切换目录,可以使用os.chdir
, 更好的一个做法是使用使用 contextmanager
from contextlib import contextmanager
from pathlib import Path
import os
@contextmanager
def change_directory(path: Path):
origin = Path().absolute()
try:
os.chdir(path)
yield
finally:
os.chdir(origin)
with change_directory('jsme'):
print(Path().absolute())
explicitly ignore error
除了常规的 try catch 去处理,我们还可以使用 contextlib 去简单的实现:
import os
from contextlib import suppress
with suppress(FileNotFoundError):
os.remove('somefile.tmp')
logging support
with structlog support, 可以设置成如下的方式,该方式可以重定向日志到文件。
import logging.config
import structlog
timestamper = structlog.processors.TimeStamper(fmt="iso")
logging.config.dictConfig({
"version": 1,
"disable_existing_loggers": False,
"handlers": {
"default": {
"level": "DEBUG",
"class": "logging.StreamHandler",
},
"file": {
"level": "DEBUG",
"class": "logging.handlers.WatchedFileHandler",
"filename": "test.log",
},
},
"loggers": {
"": {
"handlers": ["default", "file"],
"level": "DEBUG",
"propagate": True,
},
}
})
structlog.configure(
processors=[
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
timestamper,
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
],
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
structlog.get_logger("test").info("hello")
args and kwargs 的类型标注
Arbitrary argument lists can as well be type annotated, so that the definition:
def foo(*args: str, **kwds: int): ...
is acceptable and it means that, e.g., all of the following represent function calls with valid types of arguments:
foo('a', 'b', 'c')
foo(x=1, y=2)
foo('', z=0)
In the body of function foo, the type of variable args is deduced as Tuple[str, …] and the type of variable kwds is Dict[str, int].