Back to prev

Python Hard Way

Jan 21, 2021
Linkang Chan
@Jesse Chan

delete__pycache__

$ find . -name '__pycache__' -type d -exec rm -rf {} \;

doc

当我们日常开发时,一些不太常用的包或者想要确认某个方法的具体的说明时,可以通过命令行的方式去查看。

$ python -m pydoc sys

上述例子中的sys可以替换成任意你期望的包名, 或者该包名下的具体的方法。

$ python -m pydoc rich.theme

除此之外,我们还可以直接启动一个本地 server 的方式去更友好的查看文档。

# 在 localhost:1234 上启动一个文档服务
$ python -m pydoc -p 1234

some posts about python

[Asterisks in Python]: 讲解 Python 中有关*的使用

command line argument parse

使用 [click] 作为解析库,可以方便的实现子命令操作。整理一些使用中遇到的问题。

  • 对 argument 进行注释

使用多行注释的方式""" xxx """。但是 click 默认是现在在统一行,去除掉了换行的操作。所以在有多参数的情况下显示比较乱。解决方式是:

@click.command()
@click.argument('gt', type=click.Path(exists=True))
@click.argument('prefix', type=click.Path(exists=True))
def execute(gt, prefix):
    """
       \b 
       explain the command usage
       gt: xxxx
       prefix: xxxxx
    """

progess bar

使用 [tqdm] 搭配各种场景使用,比如在 requests 中显示现在的进度时,可以有如下的方式:

r = requests.get(url, stream=True, allow_redirects=True)
... # status code check 
path = pathlib.Path(filename).expanduser().resolve()
path.parent.mkdir(parents=True, exist_ok=True)

desc = filename.ljust(22, ' ') # 22 is the lenght bigger than filename, should change
r.raw.read = functools.partial(r.raw.read, decode_content=True)  # Decompress if needed
with tqdm.tqdm.wrapattr(r.raw, "read", total=file_size, desc=desc) as r_raw:
     with path.open("wb") as f:
            shutil.copyfileobj(r_raw, f)

同时也可以在命令行中使用:

find . -name '*.py' -type f -exec cat \{} \; \
  | tqdm --unit loc --unit_scale --total 857366 >> /dev/null
100%|█████████████████████████████████| 857K/857K [00:04<00:00, 246Kloc/s]

更多使用参考文档 [tqdm documention] 。

requirements.txt

这个文件可以用于python项目初始化时安装依赖使用。可以通过两种方式获取到:

# 获取完整的依赖环境
$ pip3 freeze > requirements.txt

# 获取必要的依赖
$ pip install pipreqs
$ pipreqs .

# 使用
$ pip install -r requirements.txt

tar file

获取 tar.gz 文件中的顶层目录的名称,使用下面简单的方式:

archive = tarfile.open(filepath, mode='r')
print os.path.commonprefix(archive.getnames())

zip file

使用 zipfile 库,更加灵活的打包 zip 包。

from zipfile import ZipFile

with ZipFile('target.zip', 'w') as newzip:
    newzip.write('directory_name')
    newzip.write('file_name')

使用 shutil 中的 make_archive 函数生成的包有点奇怪(?),不如使用 zipfile 来的灵活。

subprocess

使用 subprocess 时,往往需要添加子进程中的环境变量,可以使用:

import subprocess, os
my_env = os.environ.copy()
my_env["PATH"] = "/usr/sbin:/sbin:" + my_env["PATH"]
subprocess.Popen(my_command, env=my_env)

sh

sh is a full-fledged subprocess replacement for Python 2.6 - 3.8, PyPy and PyPy3 that allows you to call any program as if it were a function.

import sh
# like command run in bash. $ sed -i 's/a/A/g' filename
sh.sed(['-i', f's/a/A/g', filename]}'])

format

python 中用于格式的操作,在格式化数字的时候,可以方便的控制小数点后的面位数。具体的可以参考[string format]

"{:.2f}".format(13.949999999999999)

listdir

python 中遍历目录有好几种方式,不同的方式满足于不同的场景。

  • os.listdir

列举出当前目录下所有的文件,同时我们可以通过文件类型去进行过滤。比如:

import os 

files = [f for f in os.listdir('.') if os.path.isfile(f)]
dirs = [d for d in os.listdir('.') if os.path.isdir(d)]
  • os.walk

walk 一般会递归去获取当前目录下所有的文件包含子目录,需要通过指定一些配置来满足我们的需求。简单的使用如下:

import os

for root, dirs, files in os.walk('.', topdown=True):
    dirs.clear()
    for file in files:
        print(file)

dirs.clear()用于不递归遍历当前目录下的子目录。 如果不删除,则会显示当前目录下所有的文件包含子目录里面的文件。这需要结合具体的场景。

  • find(shell command with subprocess)

这种方式是结合了 shell 的一些特性,一般不得以的情况下才使用。

chunk

def chunk(iterable, n):
    d = {}
    for i, x in enumerate(iterable):
        d.setdefault(i//n, []).append(x)
    
    return list(d.values())

或者使用more_itertools这个库, 可以更加简单的实现。

import more_itertools as mit

list(mit.chunked(iterable, n))

补充:

使用列表生成器也是可以完成相应的操作:

[lst[i:i + n] for i in range(0, len(lst), n)]

[stackoverflow answer]: 不造轮子,使用对应的库

atexit

atexit 在注册时传递参数:


def goodbye(name, adjective):
    print('Goodbye %s, it was %s to meet you.' % (name, adjective))

import atexit

atexit.register(goodbye, adjective='nice', name='Donny')

OrderedDict

将字典 key 转换成 list,可以按照如下的方式:

>>> from collections import OrderedDict
>>> a = OrderedDict({'a': 1, 'b':2})
>>> a.keys()
odict_keys(['a', 'b'])
>>> b = [*a]
>>> b
['a', 'b']
>>> 

单纯的 keys 是odict_keys类型,而不是list类型。所以通过解引用直接获取生成 key 的 list 。

logging

在使用 logging 时,我们在自定义输出格式时,使用如下的方式:

logging.basicConfig(format='%(asctime)s,%(msecs)03d %(levelname)-8s [%(pathname)s:%(lineno)d in function %(funcName)s] %(message)s', datefmt='%Y-%m-%d:%H:%M:%S', level=logging.DEBUG)

注意这边的%(msecs)03d这个是用来保留 3 位毫秒数的,方便后续的处理。

list 间隔差

t = [1,2,4,6]
v = [t[i+1]-t[i] for i in range(len(t)-1)]
# [1,2,2]

统计数据分布

from itertools import groupby

def histogram(data, step):
    for k, g in groupby(sorted(data), key=lambda x: x//step):
        print('{}-{}: {}'.format(k*step, (k+1)*step-1, len(list(g))))

随机采样

import random
import glob

pbfiles = [f for f in glob.glob("trt_data_pb/*.pb")]

random_pb = random.sample(pbfiles, 1000)
print(random_pb[:10])

采用random.sample这个可以在一组 list 中,随机获取指定数量的内容。 如果只是想选一个,可以使用random.choice

pprint

正常使用 print 的打印效果一般,不够 pretty, 所以我们可以使用 pprint 库来完成更好的打印,正常情况下我们会使用到一些基本的参数来控制打印的效果。同时可以通过自定义统一的打印风格。

>>> from pprint import PrettyPrinter
>>> custom_printer = PrettyPrinter(
...     indent=4,
...     width=100,
...     depth=2,
...     compact=True,
...     sort_dicts=False
... )
...
>>> user={"name": "jesse", "age": 1, "address": {"street": "xxx", "city": "xxxxx"}}
>>> custom_printer.pprint(user)
{'name': 'jesse', 'age': 1, 'address': {'street': 'xxx', 'city': 'xxxxx'}}

实际上我们可以通过单独设置的方式来打印这些内容,比如:

>>> from pprint import pprint
>>> pprint(users, indent=4, depth=2)

frozenset

众所周知,dict 是一个 mutable 的结构,所以我们无法像 set 一样对两个 dict 进行一些对比操作。还在 Python 提供了一个frozenset的内置函数可以用来将 dict 转换成一个 set 。这样我们便可以使用 set 的特性来进行 dict 的对比操作。

>>> dict_a = {"Hello": "world"}
>>> dict_b = {"hello": "world"}
>>> frozenset(dict_a) == frozenset(dict_b)
False
>>> frozenset(dict_a) - frozenset(dict_b)
frozenset({'Hello'})

dict

正常情况下,我们对 dict 访问不存在的 key 时会报错 keyerror。 因此当我们想对不存在的 key 访问时,我们可以使用default参数来指定一个默认的返回值。

>>> d = {"a": 1, "b": 2, "c": 3}
>>> d.get('d', 4)
4

此外,我们还可以通过defaultdict来统一配置默认的返回值。

>>> from collections import defaultdict
>>> d = {"a": 1, "b": 2, "c": 3}
>>> d = defaultdict(lambda: None, d)
>>> d["a"]
1
>>> print(d["d"])
None

当我们需要批量获取指定的 key 的值,可以通过itemgetter来完成,比如:

>>> from operator import itemgetter
>>> d = {"a": 1, "b": 2, "c": 3}
>>> items = ['a', 'b', 'c']
>>> a, b, c = itemgetter(*items)(d)
>>> a, b, c
(1, 2, 3)

json

在处理 json load 时,提供了两个 hook 接口,一个是object_pairs_hook,另一个是object_hook,这两个函数中均可对期望的字段或者内容进行相应的处理。其中object_pairs_hook的优先级更高一些。 这两者的差别在于函数的入参。object_hook的入参是一个 dict ,而object_pairs_hook的入参是一个 list ,其中每一个元素都是一个 tuple ,第一个元素是 key ,第二个元素是 value 。

import json

json.loads('{"foo": "bar"}', object_pairs_hook=print)
json.loads('{"foo": "bar"}', object_hook=print)

# [('foo', 'bar')]
# {'foo': 'bar'}

textwrap.dedent

在使用多行文本编辑时,我们可能会使用到 textwrap.dedent 函数,它可以将多行文本的缩进去掉。 否则,就会按照实际填写的锁进原样输出。比如下面的例子。

import textwrap

text = """
    int main(int argc, char **argv) {
        print("hello world");
    }
"""

print("---")
print(text)
print("---")
print(textwrap.dedent(text))
print("---")

运行后我们可以看到如下的输出:

---

    int main(int argc, char **argv) {
        print("hello world");
    }

---

int main(int argc, char **argv) {
    print("hello world");
}

---

bytes 转 str 编码

Python3 中有两种方式进行转换

encoding = 'utf-8'
b'hello'.decode(encoding)

---

str(b'hello', encoding)

remove suffix

在 python 3.9+ 可以直接使用removesuffix等函数,在比较低的版本上,我们可以使用如下的函数进行替换。

def remove_suffix(text, suffix): 
    return text[:-len(suffix)] if text.endswith(suffix) and len(suffix) != 0 else text

使用 Requests 传递 formdata

需要使用到 urllib3 的相关函数。相关文档

import json
import requests

from ulrlib3 import encode_multipart_formdata

payload = {
        "name": (None, "xxx"),
        "args": (None, json.dumps({
            "xxx":2,
            "xxx": 'xxx',
            "xxx": False
        })),
        "smiles": (None, json.dumps(['xxx','xxxx'])),
        "file": ("filename", open(filename).read())
}

body, header_option = encode_multipart_formdata(payload)
headers['Content-Type'] = header_option

url = 'xxxx'
resp = requests.request("POST", url, data=body, headers=headers)
print(resp)

统计代码运行时间

  • time 模块
import time

t1 = time.time()
...
t2 = time.time()

elapse = t2-t1
  • timeit 模块

timer

from timeit import default_timer as timer

t1 = timer()
...
t2 = timer()
elapse = t2 - t1

命令行

$ python -m timeit '"-".join(str(n) for n in range(100))'

代码

import timeit

timeit.timeit(lambda: "-".join(map(str, range(100))), number=1000)

装饰器

from timeit import default_timer as timer

def timer_func(func):
    def wrapper(*args, **kwargs):
        t1 = timer()
        result = func(*args, **kwargs)
        t2 = timer()
        print(f'{func.__name__}() executed in {(t2-t1):.6f}s')
        return result
    return wrapper

@timer_func
def my_func():
    ....

short unique random string

我们可以通过比较通用的方式来生成随机的字符串。

import random
import shortuuid
import uuid

alphabet = string.ascii_lowercase + string.digits
su = shortuuid.ShortUUID(alphabet=alphabet)

def random_choice():
    return ''.join(random.choices(alphabet, k=8))

def truncated_uuid4():
    return str(uuid.uuid4())[:8]

def shortuuid_random():
    return su.random(length=8)

def secrets_random_choice():
    return ''.join(secrets.choice(alphabet) for _ in range(8))

其中最快的是random_choice,且无过多的依赖。

csv

当我们在开发一个通用工具时,可能会遇到不同平台或者不同工具生成的 csv 文件,这是我们需要考虑的问题,好在 csv 库提供了一些方法,让我们可以方便的去判断。

Sniff

该类提供了两个 handy 的方法,sniff用于来测试当前文件的使用的dialect。这个主要是给 reader 和 writer 函数使用的。

import csv 
buffered = io.BufferedReader(cast(io.RawIOBase, fp), buffer_size=4096)
first_bytes = buffered.peek(2048).strip()
dialect = csv.Sniffer().sniff(
    first_bytes.decode(encoding or "utf-8-sig", "ignore")
)

# then
csv.DictReader(decoded_fp, dialect=dialect)

此外,如果我们清楚是那种 dialect 时,csv 库提供了几个定义好的 dialect。包括:

  • csv.excel
  • csv.excel_tab
  • csv.unix_dialect

也可以通过自定义的方式来定义一个新的 dialect。 使用list_dialects()获取当前的所有的 dialect。

另外一个方法是has_header,用来通过经验判断当前的 csv 是否包含一个 header。

import csv

has_header = csv.Sniffer().has_header(fd.read(2048))
fd.seek(0)

field_size_limit

csv 文件可能会存在非常多的列,在 csv 的库中,我们可以通过field_size_limit来设置最大的列的信息。可以使用如下方式:

import csv
import sys

raw_size_back = csv.field_size_limit()
maxInt = sys.maxsize

while True:
    try:
        csv.field_size_limit(maxInt)
        break
    except OverflowError:
        maxInt = int(maxInt/10)

[stackoverflow]: _csv.Error: field larger than field limit (131072)

change dir

我们在 python 中要切换目录,可以使用os.chdir, 更好的一个做法是使用使用 contextmanager

from contextlib import contextmanager
from pathlib import Path

import os

@contextmanager
def change_directory(path: Path):
    origin = Path().absolute()
    try:
        os.chdir(path)
        yield
    finally:
        os.chdir(origin)

with change_directory('jsme'):
    print(Path().absolute())

explicitly ignore error

除了常规的 try catch 去处理,我们还可以使用 contextlib 去简单的实现:

import os
from contextlib import suppress

with suppress(FileNotFoundError):
    os.remove('somefile.tmp')

logging support

with structlog support, 可以设置成如下的方式,该方式可以重定向日志到文件。

import logging.config
import structlog

timestamper = structlog.processors.TimeStamper(fmt="iso")
logging.config.dictConfig({
        "version": 1,
        "disable_existing_loggers": False,
        "handlers": {
            "default": {
                "level": "DEBUG",
                "class": "logging.StreamHandler",
            },
            "file": {
                "level": "DEBUG",
                "class": "logging.handlers.WatchedFileHandler",
                "filename": "test.log",
            },
        },
        "loggers": {
            "": {
                "handlers": ["default", "file"],
                "level": "DEBUG",
                "propagate": True,
            },
        }
})
structlog.configure(
    processors=[
        structlog.stdlib.add_log_level,
        structlog.stdlib.PositionalArgumentsFormatter(),
        timestamper,
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.stdlib.ProcessorFormatter.wrap_for_formatter,
    ],
    logger_factory=structlog.stdlib.LoggerFactory(),
    wrapper_class=structlog.stdlib.BoundLogger,
    cache_logger_on_first_use=True,
)
structlog.get_logger("test").info("hello")

args and kwargs 的类型标注

Arbitrary argument lists can as well be type annotated, so that the definition:

def foo(*args: str, **kwds: int): ...

is acceptable and it means that, e.g., all of the following represent function calls with valid types of arguments:

foo('a', 'b', 'c')
foo(x=1, y=2)
foo('', z=0)

In the body of function foo, the type of variable args is deduced as Tuple[str, …] and the type of variable kwds is Dict[str, int].