Redis与fork系统调用

Redis为什么不能在Windows上工作

因为Redis虽然使用ANSI C编写,兼容所有编译器版本,但是其调用了只有*nix系统才存在的fork系统调用,而Windows上没有这个调用。

为什么Redis需要fork

Redis作为非常快速的内存数据库,也需要持久化保存数据到硬盘的能力。如果单纯凭借自己实现,就会产生很多竞争或者数据一致性、或者是保存时不得不暂停写入(甚至读取)的问题。而使用fork系统调用,就可以充分利用fork的COW(写时复制)技术来加速。

C/C++常见内存问题定位

本文旨在记录Linux下C/C++常见的内存错误。

SIGSEGV

段错误,最常见的内存错误,是由于使用错误的方式访问内存导致的,常见的错误方式有下面几种。

  1. 解引用空指针,这是最可能发生的段错误,在有core dump文件时也很容易定位,看到fault addr时0时即可基本确认该问题是引起的。

    然而需要注意的是,解引用空指针并非绝对不可行的,之所以空指针指向空而不是虚拟内存的起始位置,是因为大部分情况下系统并未提供虚拟地址0附近内存区间的映射,如果一定要映射,在有root权限的情况下是可以通过mmap实现对空指针的映射的。

    1
    2
    3
    4
    int main() {
    int *p = NULL;
    *p = 23456; // segmetation fault
    }
  2. 解引用值为未被映射的地址的的指针。通常是未初始化的指针变量造成的野指针导致的。

  3. 尝试修改不具有写权限的内存。

    1
    2
    3
    4
    int main() {
    char *str = "This is a static string";
    str[0] = 't'; // segmetation fault
    }
  4. 未开启栈保护时,发生栈溢出的情况。此时函数return时会由于栈中存储的caller地址错误而大概率产生segment fault.

SIGABRT

程序终止,这个信号通常是由程序自身发出,编程人员也可以通过手动调用abort()函数来为本进程产生这个信号,这个信号结束的程序通常是在运行时检测到的错误。常见的错误原因有下面几种。

  1. double free,即free已经free过的指针,glibc检测到堆内存中的double free会直接调用abort()终止进程。

    1
    2
    3
    4
    5
    6
    7
    #include<malloc.h>

    int main() {
    void *ptr = malloc(100);
    free(ptr);
    free(ptr); // Aborted
    }
  2. stack smashing,栈损坏,当开启了gcc的栈保护功能时,gcc在每次函数退出时都会尝试检查栈中的数据是否被修改,如果被修改则会终止程序,避免栈溢出造成更大的问题。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    #include <stdio.h>
    #include <string.h>
    #include <assert.h>

    void print_hex(void* data, size_t len) {
    char* buf = (char*)data;
    for (int i = 0; i < len; i++) {
    printf("%02X ", (unsigned char)buf[i]);
    if (i % 16 == 15) {
    printf("\n");
    }
    }
    }

    int read() {
    char buf[10] = {0};
    printf("buf = %p\n", buf);
    print_hex(buf, 80);
    printf("\n");
    strncpy(buf, "0123456789A", 11);
    print_hex(buf, 40);
    // aborted,因为libc检测到了栈损坏。
    return 0;
    }

    int main() {
    int res = read();
    printf("res = %d\n", res); // 不会被打印,已经aborted了
    }

  3. 堆内存结构损坏。当malloc使用的元数据被修改后,其检查到无法进行正常内存管理时会释放内存。

  4. assert失败,断言失败也会产生该问题。

    1
    2
    3
    int main() {
    asset(0 == 1);// false, aborted
    }

SIGBUS

在x86 CPU上这种信号并不常见,但可能会在ARM CPU上出现。

  1. 非对齐的原子内存访问,例如ldaxrstlxr两个指令,在未对齐(寄存器为x时为8字节对齐,w为4字节对齐)的情况下会产生该信号,其和系统是否允许对齐无关。

    1
    2
    3
    4
    :main
    ldaxr x0, =0x12345677 # 这里会产生sigbus,因为有非对齐的原子内存访问
    ldaxr x0, =0x12345678 # 这里会产生段错误,因为对齐了,但是虚拟内存里没有这个页,当然如果事先做了map就没问题。

  2. 非对其的指令访问。ARM指令集为等长的,均为4字节,当使用b类指令跳转到一个没有四字节对齐的地址也会产生该信号,通常这在C面向对象编程时错误的修改函数指针有关。不过在X86上可能会表现为Illegal Instruction,因为X86架构并非等长指令,即使不对齐也可以执行指令,但大概率这个指令是没见过的指令。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    #include<stdio.h>
    #include<stdint.h>

    void function() {
    printf("做点什么");
    return;
    }

    uint64_t counter[8] = {0};
    void(*func_ptr)() = NULL;

    int init() {
    // 不相关代码...
    func_ptr = function;
    // 不相关代码...
    return 0;
    }

    int somthing_counter_add() {
    // 这里其实是对counter中某个元素的自增操作,但是由于编译器内存布局具有一定
    func_ptr++;
    }

    int main() {
    init();
    somthing_counter_add();
    func_ptr();// sigbus
    }

RSA浅析

什么是RSA

简单地说,RSA就是一种非对称加密算法,它组成了今天计算机网络安全的基石。每个人的浏览、消息、支付等背后,都有RSA的身影。虽然其有很多竞争者,但直到如今,RSA依然是主流的非对称加密方式。比如B站的证书就是基于RSA的。

本文旨在让已经对非对称加密有一定了解的朋友了解RSA所使用的算法。如果你不了解非对称加密,本文并不能作为非对称加密的科普,可以去网上搜索看看其他的非对称加密相关的介绍。

RSA中使用到了许多定理与一些为被证明的假设,本文仅使用这些定理与假设,不深究其具体细节与证明。

函数与运算符号

部分函数虽然小学就学过了,但我们可能对它们的名字比较陌生,先看看这些函数的名字。

  1. $gcd(m, n) $:计算m与n的最大公因数。
  2. $lcm(m, n) $:计算m与n的最小公倍数。
  3. $m \mod n$ 或 $m % n$:计算m除n的余数,也称作m模n,前者在数学中出现,后者在计算机程序中出现。
  4. $a ≡ b (\mod c)$: a除c的余数等于b除c的余数,也被称作a与b模c同余。其中恒等号可换为等号。

如何生成一个RSA密钥?

  1. 生成两个质数 p 和 q。

    p和q应该是随机的,并且差距应该足够大。

  2. 计算 $N=pq$

  3. 计算 $λ(N)$,$λ(N) = lcm(λ(p),λ(q)) = lcm(φ(p),φ(q)) = lcm(p-1, q-1)$

  4. 选择一个数e,使得e与λ(n)互质。这里e是公钥指数。

  5. 根据e与λ(n),计算私钥指数d,满足 $ed \equiv 1\mod N$。

经历过这些步骤,我们得到了RSA加解密所需要的全部信息,将(N, d)作为私钥保存,而(N, e)作为公钥发布,而其中的p、q、λ(n) 均无需保留,可以抛弃。不过在大多数的RSA实现中,p和q都被保留在私钥文件中。

算法演示

我们使用一个较小的数来演示生成RSA密钥的方式。按照上面提到的方法生成一个演示密钥。

  1. 选择p=257,q=379.
  2. N = 257 * 379 = 97403.
  3. λ(N) = lcm(256, 378) = 48384.
  4. 选择e = 5.
  5. 计算得到d = 9677.

可以验证,(5*9677) mod 48384 = 1.

其实这里我想选e=3,发现3居然是48384的因子,遂选择5作为公钥指数。

组装公钥E为(N = 97403, e = 5),而私钥D为(N = 97403, d = 9677)。

公钥加密

当Bob持有Alice的公钥E时,如果它想给Alice发送一个秘密消息msg,需要先将m转换为一个不大于N的数字m,加密方法为计算 $c = m^e \mod N$,此时c就是密文,可以通过不安全的信道传送给Alice。

假设Bob要发送“hi”,那么可以将“hi”两个字母中的ASCII编码,计算可得“hi” = 104*256+105*1 = 26729,接下来算$26729^5 \mod 97403 = 46004$,此时49388即可发送给Alice。

私钥解密

当Alice接收到Bob的密文c时,Alice可以使用他的私钥解密。解密方法为计算$m = c^d \mod N$。

计算得到 $46004^{9677} \mod 97403 = 26729$,按照拼接ASCII的反向算法就可以得到消息msg为“hi”了。

解决消息过长

上面的例子中,Bob给Alice发送的消息仅有两个ascii字符,如果有三个,那计算得到的结果会超出N,虽然依然可以算,但是解密后的消息就不是原消息M了。比如Bob发送了消息“ack”,编码后为“ack” = 97*256**2+99*256+107 = 6382443. 继续计算密文得到c=6473,而解密后得到m为51248,和原消息不同。

解决方案非常简单:将消息分段,使每段可以满足m<N,再将分段一起发送或保存即可。

而实际应用中,非对称加密常被用于传递对称加密的密钥。RSA密钥中的N往往有2048bits或者更高的值,使用它来传递一个通常只有256bits的对称密钥是无需分段的。

密钥长度

密钥长度就是平时所谓的密钥的位数,比如256位AES就是说AES的密钥长度是256位。而RSA的密钥长度指的就是N的长度,即N有多少比特,或者说是N在二进制下的位数。

比如上文演示用的密钥N为97403,二进制为“10111110001111011”,即该密钥的位数为17位。目前被认为较为安全的是2048位及以上的密钥长度。

安全性

实践中保证RSA的安全性,需要很多细节工作。这里我们只讨论理想条件下,增加密钥长度是如何增加安全性的。从上面算法演示的过程中,我们知道,如果分解了N=p*q,那么相当于破解者也可以从第一步开始算得私钥指数d,此时该密钥对也就被破解了。目前,最行之有效的分解大数的方式是 Pollard rho算法,对于一个合数M,若其因子包括$m_1,m_2,…,m_k$,那么其期望的时间复杂度为$O(\sqrt{min(m_1,m_2,…,m_k)}$,而在实际的分解中,其时间复杂度往往能达到$O(\sqrt[4]{min(m_1,m_2,…,m_k)}$. 对于大数N=p*q 而言,其期望的时间复杂度为$O(\sqrt[2]{min(p,q)}) \leqslant O(\sqrt[4]{N})$。对于2048bits的RSA密钥来说,其消耗时间依然高达$t\cdot 2^{512}$,底线在$t\cdot 2^{256}$上,t为一个常数,所以目前在传统计算领域是安全的。

要注意的是,p与q不能过于接近,否则可以先计算$n = \sqrt{N}$,然后从n开始做减法计算因子。这种分解方法叫做费马因式分解。从上面的时间复杂度分析上我们可以知道,只要保证$|p-q|>\sqrt[4]{N}$即可让费马因式分解的期望时间长于Pollard rho算法。

量子安全性

秀尔算法(Shor’s algorithm)和量子计算机的出现似乎打破了RSA的安全神话。这项技术为因数分解提供了指数级加速,足以在多项式时间内分解N,然而其需要量子计算机有与RSA密钥位数相当的量子比特,而量子比特的增加是非常困难的。目前已知的具有最多量子比特的量子计算机为IBM Eagle(2021年),其拥有127个量子比特。

目前公开的对RSA的最佳量子破解也来自IBM,他们使用量子计算机成功分解了15=5×3.

不能被证明的安全性

事实上,RSA安全性的核心,也就是因数分解的困难性,并没有被证明。该问题是一个NP问题,可以在多项式时间内验证p*q=N,但不能在多项式时间内算出N的两个因子p和q.

P=NP这个问题至今没有答案,但RSA等非对称加密手段都是建立在P!=NP的基础之上的,由于没有人能拿出一个行之有效的破解方式,所以大家相信破解RSA是不可能的. 值得一提的是,一个调查显示,知道NP问题的人中,多数人相信P!=NP.

当然,即使哪天证明了P=NP,如果只有证明而拿不出有效的算法的话,我们或许依然可以焦虑地使用RSA等非对称加密手段.

工程实现

RSA生成的5个步骤中,第2,3,4步都很简单,然而第1步和第5步则需要特殊的算法实现。

本文会提供切实可行的实现方式,要求如下。

Windows 10及以上操作系统,Python >= 3.7,Python无需安装第三方库。

选择Python是因为其原生提供无限长度的整数运算,无需额外关注具体的大整数运算实现,同时具有较好的可读性。

对不了解Python的朋友,我在下方简单的介绍一下下文中使用的功能。

  • a ** b: 计算a的b次方,等于$a^b$.
  • pow(a, b, m): 计算a的b次方模m,等于$a^b \mod m$.
  • public_key = N, e: 将N和e”打包“进变量public_key中。
  • N, e = public_key: 将N和e从变量”public_key“中解包出来。
  • for var in range(n):循环n次,var会从0赋值到n-1,不关心var的值时可以使用 _代替。
  • random.randint(m, n): 生成从m到n的整数,包括m和n.

Miller-Rabin 算法

演示中使用了两个小质数257和379,这两个数即使是使用纸笔的人类也能验证其并非合数。然而在实际应用中,我们面对的是$10^{308}$这种数量级的质数,仅凭试除法肯定是是不能判断其是否为质数的。此时就需要一个多项式时间的算法来判断该数是否为质数,也就是米勒-拉宾素性检验算法(Miller-Rabin primality test)。

米勒拉宾算法是一种概率性的检验算法,其并不能绝对判断某数是否为质数。但是在合理选择迭代次数的情况下,其错误率是可接受的。下面给出Miller-Rabin算法的Python实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def miller_rabin(n, k):
# n必须为一个奇数
if n % 2 == 0:
return "composite"
s = 0
m = n - 1
if m % 2 == 0:
s += 1
m //= 2
d = (n - 1) // (2 ** s)
# 找到 s > 0, d > 0 且 d 为奇数, 使得 2 ** s * d = n - 1
# 由于 n 为奇数,所以 s 至少为 1,同时当 s 足够大时总能找到一个奇数 d 满足上面等式
for _ in range(k):
a = random.randint(2, n - 2)
x = pow(a, d, n) # 快速幂取余,相当于 (a ** d) % n
# 定理1. 如果 n 是一个质数,对任意整数 x,当且仅当 x = 1或x = -1时,(x ** 2) % n = 1
# 定理2. 如果 n 是一个质数,对任意整数 a,总有 a ** ((2 ** s) * d) = 1 (mod n)

for _ in range(s - 1):
x = pow(x, 2, n)
if x == 1:
return "composite"
if x == n - 1:
break
if x != n - 1:
return "composite"
return "probably prime"

Miller-Rabin 算法给出了一个时间复杂度为$O(k\cdot \log^3n)$的算法,它能有效的检查一个数是否为质数,其中k为循环次数,log来自于快速幂取余的时间复杂度。而其将一个合数误报为质数的概率为$4^{-k}$,因此很容易将在线性时间内以足够高的准确度预测n是否为一个质数。OpenSSL中,k默认为20.

质数分布与大质数生成

一个显而易见的事实是,除了2和3,质数n必然满足 $n = 6k + 1$或$n = 6k-1$,其中k为正整数。

当生成一个n bits的质数时,实际上是在$[2^{n-1}, 2^n)$中找一个整数,验证是否为质数。

由于质数分布规律较为复杂,其生成时间具有较大的不确定性。不过,根据质数定理,对于一个自然数N,其前面的质数大约有 $\frac{N}{\ln N}$个。

那么,如果生成1024bits的质数,我们可以算得在$2^{1023}-2^{1024}$质数的密度如下。
$$
\frac{\frac{2^n}{\ln {(2^n)}}-\frac{2^{n-1}}{\ln (2^{n-1})}}{2^n-2^{n-1}} = \frac{2-\ln 2}{n\ln2} = 1.841 \times 10^{-3} = 543.12^{-1}
$$
可以看到,平均而言,每543.12个数中有一个质数。这个数字不是很大,所以即使暴力搜索也能比较快的搜索到。下面是具体的实现代码。

1
2
3
4
5
6
7
8
9
10
11
12
def gen_prime(n):
# n为要生成质数的位数
num = random.randint(2 ** (n - 1), (2 ** n) - 1)
num = ((num // 6) * 6) + 1 # 6n+1 模式
# 这里设置为5,速度会快很多,安全性上应该设置为较大的值,比如15以上
while miller_rabin(num, 5) != "probably prime":
# 如果不是质数,那就再生成一个
num += 4 # 6n + 5 模式
if miller_rabin(num, 5) == "probably prime":
break
num += 2 # 6(n+1) + 1 模式
return num

测试CPU为i7-7700HQ,生成1024bits的质数,测试20次,平均消耗31.14秒。实际平均约10000个奇数遇到一个质数。和上面的计算相差较大,暂未知原因。

扩展欧几里得算法

第五步中,要求计算 de = 1 (mod λ(N)). 即计算e模λ(n)的乘法逆元。

变形这个算式,可以得到 $de = kλ(N) + 1$,其中d, e, k 均为正整数,e为已知数。

当然,可以从k=1开始,一个个计算符合的d,不过有更快的算法,也就是标题提到的扩展欧几里得算法。

扩展欧几里得算法,顾名思义,其由欧几里得算法扩展而来,而欧几里得算法就是辗转相除法。
$$
gcd(a, b) = gcd(b, a \mod b)
$$
而扩展欧几里得算法,则可以求解裴蜀等式(或贝祖等式)
$$
ax+by = gcd(a,b)
$$
求解裴蜀等式就是当 a, b 已知时,求解一对可以使等式成立的x, y.

我们将求解乘法逆元的算式代入,得到下面的式子。
$$
de-λ(N)\cdot k = 1
$$
接下来就可以使用扩展欧几里得算法求解d了。

1
2
3
4
5
6
def ex_gcd(a, b):
if a == 0:
return b, 0, 1
else:
gcd, x, y = ex_gcd(b % a, a)
return gcd, y - (b // a) * x, x

简单地说,ex_gcd返回的第二个值就是要求的私钥指数d.

自己生成一个密钥对

上文中给出了生成一个密钥对所需的全部算法。接下来我们可以自己生成一个可以使用的密钥对了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def fmt_big_int(name, num):
max_len = 30
name += " = "
indent = len(name)
num = str(num)
if len(num) <= max_len:
return f"{name}{num}"
num_l = []
for idx in range(0, len(num), max_len):
num_l.append(num[idx:idx + max_len])
return name + " \\\n{}".format(' ' * indent).join(num_l)


def generate_key_pair():
bits = 1024
p = gen_prime(bits)
q = gen_prime(bits)
N = p * q
lambda_N = math.lcm(p - 1, q - 1)
e = 65537 # 通常e会选择65537,这几乎不会和lambda_N有公因数
gcd_e = math.gcd(e, lambda_N)
if gcd_e != 1:
print(f"真不巧e和λ(n)有公因数{gcd_e}")
return
_, d, _ = ex_gcd(e, lambda_N)
if d < 0: # 可能算出负数来,算出来了就加上,避免私钥指数为负数。
d += lambda_N

print(fmt_big_int("N", N))
print(fmt_big_int("p", p))
print(fmt_big_int("q", q))
print(fmt_big_int("e", e))
print(fmt_big_int("d", d))

下面是我生成的密钥对。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
N = 227442712852228682420414779319 \
562687651643739341662796692929 \
163922400267934787590621116694 \
891760619327203004901144906832 \
800095526049668470542778999323 \
651897322671119077281968001372 \
446392959926682893287212273771 \
241722129942437331580897662308 \
578666249068303886306662169412 \
193217352311366461532975812852 \
124193228404641158708725065274 \
708644109118939811612576529812 \
846988041909580026981490541567 \
445743128463667379869830312310 \
386152013370124058632684548068 \
516907871220638110433917758192 \
056516147025352893608425414982 \
659378237947057838512573683719 \
656010305302908753593858876575 \
962355149895528849120178912224 \
35170357514424413
p = 145051038077958860693796012628 \
625876634665931949940050370449 \
387104965384223153152471552741 \
833745166885914511061893077670 \
464028682608703752485689595048 \
619321249150617970591218334362 \
527527974241212468108567074763 \
798879903619803151999405468786 \
884736416473525334019578812862 \
866293752484163777592143766285 \
811404063
q = 156801851173231690833526980220 \
070443094216022723233530479768 \
612945390163310468787991953168 \
708616715527769639602060536294 \
556776062794179272183786104176 \
586288917546573301530873029141 \
344273752018571280068748133198 \
532396742562461156175632492260 \
503161905280147990066527029768 \
573516310751136244023374550179 \
197384451
e = 65537
d = 190701085970214780948194029687 \
199134633227390280671539409439 \
821132122232067634742277345047 \
596048736317344478986189673473 \
951588402832434845298468132701 \
140909072444237503954165458831 \
132479258250625219130145024089 \
136405862952788979818580748948 \
783095204026783321674032778570 \
883886865579895128877382561243 \
636791701404807498474376829463 \
986075399201337437286982733483 \
883375774171899112093369852188 \
022751835849225142095728136344 \
556634868177611264216881972938 \
769052035843741360883732498917 \
623652545912814027557437888976 \
645651446879691031770018258055 \
933872022471957950567451266956 \
588963326221579878715175692341 \
743003401618123

在实践中,不要使用文章里的方式生成密钥,这只是作为演示使用。虽然文章完整的复现了RSA的生成算法,但是很多地方并没有考虑。比如没有检查p和q是否过分相近,Miller-Rabin检查次数过少,没有使用安全的随机数生成器等等,它虽然是一个可用的RSA密钥对,但是并不具有足够的安全性。

尝试使用自制的密钥对进行加解密

现在我们已经具备生成一个RSA密钥对的能力了,可以尝试使用生成的密钥对进行数据的加解密。

注意:这种加解密方式是不安全的 ,不要在实践中使用这种方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def big_int_to_bytes(num):
out = []
while num > 0:
out.append(num % 256)
num //= 256
return bytes(out)


def bytes_to_big_int(content: bytes):
s = 0
for i, byte in enumerate(content):
s += byte * (256 ** i)
return s


def encrypt(public_key, msg: bytes):
N, e = public_key
msg = bytes_to_big_int(msg)
if msg > N:
raise ValueError("msg过长,无法编码")
cipher_text = pow(msg, e, N)
return big_int_to_bytes(cipher_text)


def decrypt(private_key, cipher_text: bytes):
N, d = private_key
cipher_text = bytes_to_big_int(cipher_text)
if cipher_text > N:
raise ValueError("密文过长,无法解密,可能不是密文。")
plain_text = pow(cipher_text, d, N)
return big_int_to_bytes(plain_text)


def test_rsa():
N = 227442...4413 # 真实数字在上面,这里太长就不放了
e = 65537
d = 1907...123
msg = b"Hello, I'm Bob."
print("msg = ", msg)
cipher_text = encrypt((N, e), msg)
print("cipher_text = ", cipher_text)
decrypted_text = decrypt((N, d), cipher_text)
print("decrypted_text = ", decrypted_text)


加密与解密的结果

1
2
3
4
msg =  b"Hello, I'm Bob."
cipher_text = b'\x0f\x9b\t...@\xc5\xadc\x97\x9a'
# 加密的密文比较长,就不全写了,可以自己复制测试
decrypted_text = b"Hello, I'm Bob."

RSA特性

从上面的算法讲解,可以发现RSA有很多特性。

公钥与私钥的对称性

上文中,我们并没有对公钥指数e做出什么特殊要求,仅仅是看似随机的选取了65537这个数,它并没有成为公钥指数的必然性。而在加密和解密章节中,这种对称性则更加明显,除了变量名和报错信息有区别,它们完全是一样的。

任何与λ(n)互质的数都可以作为公钥e。而解密密钥d则是使用扩展欧几里得算法得到的。回到求解私钥的要求中。
$$
ed ≡ 1 (\mod λ(N))
$$
可以看到公钥与私钥时对称的,当把选取的数字作为私钥指数时,那么计算得到的就是公钥指数了。事实上,RSA三人组最初的论文中,选取的数字就是作为私钥指数,而计算得到的则为公钥指数。

但是,将选取的数字作为公钥具有一定的好处。选择一个很小的数字作为公钥指数,私钥指数就会非常大,这为破解增加了难度。

交换公私钥的安全问题

假如以d作为公钥指数,那么私钥指数e就只有65537了,此时如果知道d和N,就可以对e进行暴力破解。下面是对交换公私钥后成功的暴力破解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def crack_unsafe_private_key(public_key, cipher_text):
# 假设 Alice 加密的是以ASCII编码的明文
N, e = public_key
cipher_text = bytes_to_big_int(cipher_text)
for d in range(3, 100000):
probably_plain_text = pow(cipher_text, d, N)
probably_plain_text = big_int_to_bytes(probably_plain_text)
if probably_plain_text.isascii():
print("破解出明文可能是: ", probably_plain_text, "d = ", d)
return


def test_crack():
# 对当前的公钥选择策略而言,交换公私钥后,其不能保证安全
N = 227442...4413
e = 65537 # e仅作为展示,此处假设它是隐藏的私钥指数
d = 190...123 #让算出来的d作为公钥,此时不仅加密速度变慢,而且会不安全
# 这里反向利用,将计算出来的d作为公钥指数,而e作为私钥指数
cipher_text = encrypt((N, d), b"Hello, I'm Alice.")
crack_unsafe_private_key((N, d), cipher_text)

运行结果如下

1
破解出明文可能是:  b"Hello, I'm Alice."  d =  65537

可以看到,2048bits的密钥很容易就被破解了。所以虽然公私钥具有对称性,但是并不能真的将随便一个数作为私钥指数。使用私钥加密,公钥解密并不能保证秘密消息的安全性。

数字签名

然而私钥加密依然有其用武之地。只有使用私钥加密的信息,其他人才可以使用公钥解密。利用这个特性,Alice可以将其要公布的明文信息与该信息被私钥加密的密文一起公布,这样任何持有Alice公钥的人都可以验证消息来自Alice。Alice的动作叫做签名(Sign),而验证者Bob的动作叫做验正(Verify)。

1
2
3
4
5
6
7
8
9
10
11
def sign_and_verify():
N = 2274427...413
e = 65537
d = 190...123
msg = b"Hello, I'm Alice"
cipher_text = encrypt((N, d), msg)
print("msg = ", msg, "cipher_text = ", cipher_text[:10] + b"...")
# 验证签名
decrypted = decrypt((N, e), cipher_text)
print("decrypted = ", decrypted)
print("is decrypted == msg :", decrypted == msg)

函数输出如下。

1
2
3
msg =  b"Hello, I'm Alice" cipher_text =  b'\xd0@;2\x995X\x7f\x1e\x94...'
decrypted = b"Hello, I'm Alice"
is decrypted == msg : True

而如果有人篡改了明文或者篡改了密文,那么解密后的密文与明文会不同,也就验证消息没有来自Alice。

在实际应用中,文件通常很大,通常总会超过密钥能加密的最长长度。而分段虽然可以,但是需要下载两个内容完全一样的文件,消耗一倍的流量。所以实践中,会先对文件求一个加密哈希(如SHA-256),然后对哈希值做加密,这样签名的体积就会小很多。验证者只要解密出哈希后,再对原文件做哈希,比对两个哈希值是否相等即可

明文填充

可以看到,如果我们把RSA明文转换为正整数加密,那么1显然是无法被加密的。为了避免这种情况以及增加安全性,出现了明文填充 (Padding)。Padding 的目的为尽可能提高安全性,以及提供验证消息是否被正确解密的方法。选择一个好的Padding可以增强RSA的安全性。

比如在交换公私钥的安全问题提到的暴力破解中,其实只要解码出任意一个字符是非ASCII字符即可跳过,这提高了破解速度,而一个好的Padding,必须要解码全部消息才能够验证消息是否被正确解密。

不过Padding不在本文的讨论范围内。

实践中使用RSA

这篇文章主要介绍RSA的实现及其算法,不会对如何使用RSA工具做赘述。但本文可以提供一些工具的简单介绍。

下面这些工具很多不止提供基于RSA的非对称加密,也有基于其他算法的非对称加密。

OpenSSL

这是世界上最主流的非对称加密软件,内置在许多操作系统中,Chrome浏览器完成HTTPS使用的也是OpenSSL的一个分支版本BoringSSL。

下面的命令可以在Windows上使用(PowerShell),应该也适用于Linux及Mac。

1
2
3
4
5
6
# 生成一个1024bits的质数
openssl prime -generate -bits 1024
# 生成一个2048bits(3.1.10 默认)的RSA密钥
openssl genrsa 2048
# 生成一个2048bits rsa密钥,并以人类可理解的方式打印
openssl genrsa 2048 | openssl rsa -text

可以看到第三个命令打印了了 exponent1 和 exponent2,以及coefficient三个本文没有介绍过的值。

这三个值是从私钥计算而来,目的是加快私钥解密速度,并不对RSA算法起实际作用,删去它们不会对算法的正确执行造成任何影响,只是会慢一些。

PGP

PGP原本是一个使用非对称加密的商业应用程序,但随着时间进展,其已经变成了一套标准,被称作OpenPGP,有很多符合该标准的开源软件。

Windows平台有Gpg4win,这是一套软件,可以提供邮件、文件的加密与验证。

Android有OpenKeyChain,也可以较为简单的使用。

在线版本

也有一些网页提供了网页版RSA加解密,比如这个网站就提供了RSA生成与加解密的功能。

Python哈希冲突浅析

Python字典是一种常用的数据结构,内部采用hashmap数据结构,利用元素的hash值作为索引。

字典实现

首先实现源码在 Github:Objects/dictobject.c

与Java的HashMap不同,Python字典解决冲突的机制并非拉链法,而是开放定址法。插入元素index冲突时会采用一种特殊的算法来计算新的索引,直到找到空位为止。

当字典已使用空间达到2/3时,继续插入会触发扩容,通常初始字典的大小为8,扩容大小为原来的2倍大,即字典的slot大小只会是2的幂数。然而字典中也会有其他元素,所以实际占用空间并非2的幂数。

哈希冲突

oCERT-2011-003提出了一种哈希冲突的问题,即故意制造大量冲突对服务器造成DOS攻击。该漏洞广泛影响了多种编程语言,包括当时的Java、Python、Ruby、PHP,不过C、C++由于当时没有标准库内置hashmap幸免。

漏洞的根源是由于这些语言使用非加密hash函数,导致攻击者很容易执行原相攻击和第二原相攻击。通常语言内置hash函数的结果只会被作为hashmap、hashset等数据结构的索引使用,并不用来执行与机密数据相关的操作,所以语言采用非加密hash无可厚非。然而攻击者可以利用这个漏洞恶意制造大量相同hash值的不同对象,迫使hashmap退化为线性结构而引发DoS攻击,严重的可引起服务器宕机。

如今,许多语言都对这种方法做了防范,例如Python在对str和bytes进行hash时会采用一个随机值“加盐”,这个随机值被称为hash seed,hash seed在同一进程中保持一致,但在独立启动的不同进程中是随机的。这24字节的随机值可以保证网络上的攻击者无法发动上述攻击。而如果遇到可能由于hash函数导致的bug,也可以通过设置PYTHONHASHSEED环境变量的值来确保每一次hash结果相同来复现bug。

其它语言中,Java会在hash冲突过多时将冲突的部分转化为红黑树来保证速度不会过慢。Rust直接采用了加密hash,但也导致了在需要大量计算hash时速度有明显劣势,需要调整编译flag以采用常规hash。

在Python官方文档中,其表明精心构造的输入可能会导致字典插入时具有O(n^2)级别的时间复杂度,然而我查阅了字典实现的源码后发现,其可能拖慢插入速度的总共有两个函数,分别是find_empty_slot:1165dictresize:1405,前者寻找空槽明显可以看出是线性复杂度,而后者则是字典扩容,虽然需要重新获取key对象的hash,但是可以看到也是线性的复杂度。同时前文提到扩容为倍增扩容,大量输入时一段时间内的平均触发次数只会越来越少。从理论上讲是不会有平方复杂度的。

但实际情况可能和理论有所差别,所以我写了一段测试代码用来测试插入时间。测试的机器CPU为 i7-7700HQ ,内存为 DDR4 2133MHz ,下面是测试代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import time


class TestHashConflict:
def __hash__(self): # 模拟恶意构建的冲突hash,永远返回100作为hash值
return 100


class TestNiceHash:
def __hash__(self): # hash自己的id(类似hash指针),是一种简单的hash方法
return hash(id(self))


def add_objs(n, dic, factory): # factory不同,使用的对象也不同
start = time.perf_counter() # 使用perf_counter 计算hash时间,避免time.time()不精确造成的问题
last = start
for i in range(n):
o = factory()
dic[o] = i
if i % 1000 == 0: # 每添加1000个计时一次
now = time.perf_counter()
print(f"{i}\t{now - start}\t{now - last}")
last = now
end = time.perf_counter()
print(f"end time:{end - start}")


def main():
n = 1_000_000
add_objs(n, {}, TestNiceHash) # 先测试添加一百万个普通的hash对象,总用时大约0.933秒
add_objs(n, {}, TestHashConflict) # 一百万个没添加完,添加到90万时总用时大约2.5万秒


if __name__ == '__main__':
main()


这里我强行定义TestHash对象的hash值永远是100,而不同对象并不相等。同时为了说明计数和打印的代码几乎不会对性能造成影响,同样插入一百万hash自身id的对象模拟正常对象。在空字典中插入一百万正常对象时总用时只有约0.933秒,而插入一百万hash相同的对象时总用时高达数万秒,事实上我只插入了90万hash相同的对象,总用时已经来到了2.5万秒左右。将数据导入execl表中,可以看到平均插入的时间确实是预期的O(n)。这里我没关垃圾回收器,尖峰可能是垃圾回收导致的。

image-20220611200444050

采用加密货币思想进行HTTP流控

流控是常见的功能,很多重要位置都有流控。

为什么进行流控

在实际生产中,我们的很多资源都是有限的,比如视频转码的CPU以及GPU资源、流量配额、邮件配额、数据库连接等,这些资源如果不提前预计分配,导致某一配额用光或队列爆满,就会导致服务暂时甚至长时间的处于不可用状态,会严重增加维护成本。

当然,如果我们面对的都是普通用户,那么在正确估计了用户量的前提下,无论用户是网页查看还是API调用,都是很难发生资源耗尽这种事情的。但如果有人对我们进行CC攻击,情况就不一样了,恶意攻击往往会对常见的资源最紧缺的地方发起,让我们必须使用一些手段来保证资源不被迅速耗干。在仅能由客户端或网页访问的地方,我们可以部署不同的人机验证程序,但在公开的API中,我们就需要流控手段来限制访问频率。

常见的流控手段

常见的流控手段有计数器算法、漏桶算法、令牌桶算法等,不过在这里我只是提一下名字,这些传统算法并不在这篇文章的讨论范围内。

加密货币思想

加密货币在前几年风靡互联网,其核心的思想就是工作量证明(POW),通过计算和验算所需的算力不对等来保证加密货币的货币属性。而服务器也可以通过加密货币的核心思想要求调用接口客户端证明一定工作量,而证明工作量则需要一定的时间,通过这种方式达到流控的效果。

举一个简单的例子。

首先服务器随机生成一个token并保存,将token传给客户端后,让客户端随机生成nonce,满足 sha256(token+nonce) < 0x000...fff。由于加密散列的抗原相特性,客户端唯一的方法就是重复生成nonce直到满足要求,然后将nonce和token传回服务器,而服务器仅需验证一次即可。

当然,服务端也可以采用AES(timestamp+token)的方式进一步减少储存token的内存要求,选用更快的加密散列函数(如blake2b)而设定更小的范围来进一步减小对CPU的要求。

特点以及为什么没人用

通过POW,确实是一种与硬件绑定的、难以绕过的验证方式,这种方式也平衡了只有少量客户端的家用IP与有大量客户端的公司、学校IP,使得不同IP下的用户有着较为一致的体验。在面对攻击时,如果攻击者没有足够的算力也难以破解。

然而它的缺点也同样非常明显,首先是算力,即使可以用wasm最大化CPU算力,同样面临着不同机器算力不同的问题,手机显然要比高配电脑算的更慢,然而当前的环境下,手机往往是访问的主要流量来源,这就让这种方法的实用性进一步下降。而对于防范被攻击来说,基于IP的令牌桶算法已经具有较好的效果了,对人类使用的接口,还可以增加reCAPTCHA这样的人机验证装置,这种方式的实际适用范围很小。

概念验证代码

这篇文章算是群友的突发奇想,我在此附上一个概念验证代码,由python编写。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import hashlib


def inc(b: bytearray):
i = 0
while i < len(b):
# 这里采用的是大端序自增,实际上无论怎么自增影响都不大
tmp = b[i] + 1
b[i] = tmp % 256
if tmp % 256 == 0:
i += 1
else:
break


def client_calc(token: bytes, target: bytes):
nonce = bytearray(b"\0" * 16)
while True:
res = hashlib.sha256(token + nonce).digest()
# 客户端不断计算计算新的值
if res < target:
print("[Client]\tFound: {}, nonce: {}".format(res.hex(), nonce.hex()))
# 满足条件返回
return nonce
inc(nonce)


def fake_client_calc(token: bytes, target: bytes):
print("[Client]\tFake calc")
# 如果不计算而返回一个假的就会变成这样
return b"fake nonce"


def server_run(calc_func):
print("[Server]\tcalc_func: {}".format(calc_func.__name__))
token = b"token token" # 此处可以随机生成,这里仅作演示

# 要小于这个才能认为通过,如果运行慢可以适当增大
target = bytes.fromhex("00000fffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")

nonce = calc_func(token, target) # 这里模拟将token发送给客户端并让其计算
digest = hashlib.sha256(token + nonce).digest()
if digest < target:
print("[Server]\tFound: {}, nonce: {}".format(digest.hex(), nonce.hex()))
else:
print("[Server]\tInvalid nonce: {}".format(nonce.hex()))


def main():
server_run(client_calc)
print()
server_run(fake_client_calc)


if __name__ == '__main__':
main()

Cloudflare 导致的Range头失效

Cloudflare引起的问题

在配置服务器上的Nginx时,发现无论怎么配置都无法分段下载,并且并没有返回错误,通过 curl -I -R "Range: byte=100-200" https://my.domain.com/files/xxxx.mp4,服务器返回的的HTTP头也并非206而是200,而在服务器后台查看 access.log 发现log的返回值为200,而正常的206,多次修改配置无果后,我怀疑是其他中间件导致的问题。
由于我启用了Cloudflare反向代理,它自然成为了我的第一个怀疑对象。起初我的怀疑原因很简单,可能是由于Cloudflare的缓存没有刷新导致的问题。
于是打开了cf的开发模式,继续尝试,发现可以使用IDM分段下载了,同样的curl命令也会返回206了,于是我清除缓存后关闭开发模式继续尝试,发现又不能分段下载了。
## 实际原因
Cloudflare确实会缓存文件,然而根据客户协议,除非单独购买,否则一部分文件不会缓存(类似视频等),而这是通过扩展名进行判断的,所以URL的扩展名一定条件后就会触发这个机制,让Cloudflare丢弃客户端请求的Ragne头,服务端看不到,自然也就无法支持分段下载。
然而这是我根据社区问答中的只言片语和实验猜测出来的结果,我并没有找到描述其实际行为的文档。
### 参考
Cldouflare社区问答
服务条款 在该服务条款的2.8节

服务条款2.8的英文原文

The Services are offered primarily as a platform to cache and serve web pages and websites. Unless explicitly included as part of a Paid Service purchased by you, you agree to use the Services solely for the purpose of (i) serving web pages as viewed through a web browser or other functionally equivalent applications, including rendering Hypertext Markup Language (HTML) or other functional equivalents, and (ii) serving web APIs subject to the restrictions set forth in this Section 2.8. Use of the Services for serving video or a disproportionate percentage of pictures, audio files, or other non-HTML content is prohibited, unless purchased separately as part of a Paid Service or expressly allowed under our Supplemental Terms for a specific Service. If we determine you have breached this Section 2.8, we may immediately suspend or restrict your use of the Services, or limit End User access to certain of your resources through the Services.
大意:服务主要作为缓存和服务网页和网站的平台提供。 除非明确包含在您购买的付费服务中,否则您同意仅将服务用于 (i) 提供通过网络浏览器或其他功能等效应用程序查看的网页,包括呈现超文本标记语言 (HTML) 或其他功能等效物,以及 (ii) 提供受本第 2.8 节规定的限制的 Web API。 禁止使用服务来提供视频或不成比例的图片、音频文件或其他非 HTML 内容,除非作为付费服务的一部分单独购买或根据我们针对特定服务的补充条款明确允许。 如果我们确定您违反了第 2.8 条,我们可能会立即暂停或限制您对服务的使用,或限制最终用户通过服务访问您的某些资源。

解决方法

由于请求在转发至服务器的时候Range标头就被丢弃了,这使得我可能无法通过Cache-Control的响应头来控制缓存行为(未实验),所以我选择使用路径规则,当匹配部分字符串时则绕过缓存。
具体做法为登录cf中登录网站的控制台,在 规则 > 页面规则 中选择创建页面规则,配置需要的路径后将缓存级别设为绕过。

python多进程随想

为什么Python多进程入口需要用if __name__ == '__main__'包裹呢?
实际上,如果在Linux上运行,通常情况下即使没有这行判断也并不会造成问题。
因为在类Unix系统中,程序可以通过fork创建自身的副本,这也是Python在Linux系统下产生子进程的默认行为,此时作为副本的子进程和父进程的内存是完全一致的,程序只需要通过fork的返回值判断是父进程还是子进程后,子进程运行指定函数,父进程继续向前运行。
然而在Windows中,程序只能通过 spawn 方法来执行一个全新的子进程,内存不共享,Python只能重新加载所有脚本,通过 pickle 给子进程传递相应的被序列化的函数对象执行,如果不做限制,就可能导致无限递归地产生子程序。不过Python3做了防备,不会出现这种无限递归的问题,会直接报错。而Python2则会无限递归耗尽系统资源。
类似的原因,你不能在Windows中将闭包函数作为多进程的run target参数,因为闭包在Python中不能被序列化,然而Linux通过fork调用,子进程获取了父进程的内存,所以是可以使用闭包作为target参数的。
同样的,一些可变的全局变量在不同系统中表现也不同,也许会有意想不到的错误。
有空附上源码分析和

最后附上一个原型验证代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import multiprocessing
import os
import time


def test_a():
print("print in test_a name:{} pid:{}".format(__name__,os.getpid()))
if __name__ != '__main__':
print("sleep 1s" )
time.sleep(1)
print("sleep over" )
def test_closure():
def test_b():
# 当然,python没有静态优化,即使这个闭包没有使用所在函数的任何属性,也依然不能被序列化
print("print in test_b name:{} pid:{}".format(__name__,os.getpid()))
if __name__ != '__main__':
print("sleep 1s" )
time.sleep(1)
print("sleep over" )
return test_b

def main():
# __name__ 在主进程中为 '__main__', 在子进程中为 '__mp_main__'
# 此处的注释无特殊说明都是描述的在Windows系统下的情况
print("create process object{}".format(__name__))
# 当target为 `test_a` 时,如入口没有 if __name__ == '__main__'保护,则子进程运行到下面的 `p.start()` 处报错。
# p = multiprocessing.Process(target=test_a)

# 当target为 `test_closure()` 时,无论有无保护主进程都会在运行至`p.start()`处报错,因为闭包结构并不能被Python序列化,也就无法正确产生子进程。
p = multiprocessing.Process(target=test_closure())

print("start process{}".format(__name__))
p.start() # 主进程或子进程运行到此处时会报错
print("join process{}".format(__name__))
p.join()


if __name__ == '__main__': # 如果删去本行判断,产生的子进程运行到 `p.start()` 时会报错
print("before main, __name__ is :{}".format(__name__))
main()
print("after main, __name__ is :{}".format(__name__))

VSCode 远程开发技巧

在越来越多的插件的加持下,VSCode的功能逐渐变得强大起来,虽然在Python代码提示、高亮和静态检查上不如jetbrains家的IDE,但是jetbrains的IDE想要进行远程开发,必须要购买专业版,此时拥有免费的远程开发插件则让VSCode成为了我远程开发的首选。本文只记录在Windows上通过SSH连接Linux进行开发的部分技巧及备忘。

首次配置

配置VSCode远程开发,首先需要安装微软提供的远程开发插件 Remote - SSH,安装完成后,如果是win10以下的系统需要安装OpenSSH,而Win10自带,不需要手动安装。

配置文件

配置文件是SSH连接的核心文件,VSCode需要根据配置文件查找远程主机并连接,配置文件的格式与SSH config文件格式相同,使用#作为注释的开头,我仅列举我常用的一些字段。

  1. Host: 主机名称,一个备忘的名字,空格需要引号,有特殊字符。
  2. HostName: 主机地址,IP或者域名。
  3. User: 登录的用户。
  4. IdentityFile: 使用密钥对方式登录时的私钥。
  5. ProxyCommand: 如果使用代理命令需要该选项。
  6. Port: 如果要连接的主机SSH端口为非常规的22端口,则应该设置Port字段。
    一个常见的配置文件样式
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    Host my_lan_host
    HostName 192.168.1.10
    User daenerys
    Port 7654
    IdentityFile ~/.ssh/lan_host.key

    Host tencent_cloud
    HostName 100.100.100.100
    User root
    IdentityFile ~/.ssh/tc.key

    Host american_host
    HostName 99.99.99.99
    ProxyCommand "C:\bin\nmap-7.70\ncat.exe" --proxy-type socks5 --proxy 127.0.0.1:1080 %h %p
    User adminadmin


配置文件权限问题

在默认情况下,VSCode使用OpenSSH的config文件,路径是%USERPROFILE%\.ssh\config,而如果你在此前没有使用过OpenSSH,则可能会会有一些权限上的问题,VSCode并不检查该文件的权限,而OpenSSH则会检查权限,如果该文件的权限不符合要求则不会使用该文件。此时会出现如下的错误提示。

1
2
C:\Users\your_user_name>ssh 10.0.100.100 rem 这里当然是你要连接的ip
Bad owner or permissions on C:\\Users\\your_user_name/.ssh/config

此时有两种解决方式,一种方式是修改config文件的权限,另一种是修改VSCode配置。

  • 解决方式1
    找到该文件后,右键属性 > 安全 > 高级,选择禁用继承,在弹出的对话框里选择从此对象中删除所有已继承的权限,此时会显示 所有组或用户均不具有访问此对象的权限。但是对象的所有者可以分配权限。
    此时点击添加,选择主体,高级,立即查找,在下方的用户中找到你当前的用户,确认后为其分配完全控制权限即可。
  • 解决方式2
    可能你并不喜欢修改权限,Windows权限复杂又难以更改,那么可以不修改文件权限,而直接修改VSCode所使用的配置文件即可绕过这一问题。不过如果要配置无密码访问的话修改权限是绕不过的。
    具体做法很简单,找到Remote - SSH插件,点击右下角的设置,第一个选项 Remote.SSH: Config File就是所使用的配置文件了,修改为另一个配置文件即可。

使用代理连接你的主机

并不是所有时候SSH的连接都是通畅的,比如在内网中只能使用HTTP代理来连接公网,或者服务器IP被墙等多种原因,都会让你不得不使用代理来连接,然而遗憾的是,OpenSSH并不原生支持使用常规的HTTP或Socks代理连接,只能使用SSH跳板进行连接。
但是OpenSSH可以使用代理命令,原理则是将原本的SocketIO转为代理程序的stdio,在Linux上可以使用netcat(nc)进行代理,而在Windows上就需要手动下载netcat for Windows,并将绝对路径传给ProxyCommand或将ncat加入环境变量,我此处使用绝对路径。以上面的配置文件为例。

1
2
3
4
5
Host american_host
HostName 99.99.99.99
ProxyCommand "C:\bin\nmap-7.70\ncat.exe" --proxy-type socks5 --proxy 127.0.0.1:1080 %h %p
User adminadmin

其中的特殊字段%h%p,在ssh运行时,这两个字段将会自动被替换为HostNamePort,所以实际上则是启动一个nc --proxy-type socks5 --proxy 127.0.0.1:1080 99.99.99.99 22的程序,并将原本的SocketIO转为该程序的stdio。故你也可以用自己实现的任何其他手段来进行流量转发,而不局限于netcat。

终端配置

默认状态下的终端虽然可用,但对我来说用起来并不是非常方便,还需要进行进一步的自定义配置。
在终端运行程序时,常常需要用到很多ctrl组合键,然而部分组合键并不会转发至shell,而是会激活VSCode相关的功能,尤其是我使用的是IntellJ的键位,问题更加明显,此时需要将Terminal › Integrated: Allow Chords设置为true 。
在启用了鼠标功能的tmux中,鼠标的滑动会被汇报至shell,此时无法正常选择复制。可以按住shift键进行选择和点击(注:点击过程中不能松开shift)
对于右键行为,可以在Terminal › Integrated: Right Click Behavior进行更改。

python自学指南

近年来,Python是一门非常火的语言。我根据我的学习历程,总结了一些我的学习经验。这篇文章会逐渐更新,但现在就写这么多。

自学流程以及教程

  1. 认识黑框框。

    几乎所有的编程语言都要从这个黑框开始,在Windows中被称作cmd(命令提示符),而在Linux、Mac OS中被称作terminal(终端),即使有IDE,在程序实际运行中也免不了和这个东西打交道,所以学习基础的命令操作和认识命令提示符中的提示非常重要,这是任何编程学习的基础。

  2. 安装Python、找一个靠谱的IDE。

    如今,安装Python是一个非常简单的工作,和安装其他软件一样,下载安装包,一路下一步就足够了。

    然后就需要寻找一个IDE了,IDE可以避免很多麻烦的事情。我个人推荐PyCharm社区版,免费,几乎包揽了所有工作,基本不需要配置,支持官方中文。当然,你也可以选择VS Code、Visual Studio或者Notepad++这样的工具。

  3. 学会基本语法。

    初学者学习语言,第一件事就是学习如何让程序运行起来的基本语法。而想让程序能够完成目的,其基础是顺序、分支、循环三种语句。Python中有很多关键字,但这些关键字有些并不常见(例如yield, nonlocal, async 等),不认识也没关系,只要不用就可以了。

  4. 认识一些基本函数。

    Python中有很多内置函数,然而和关键字一样,大多数内置函数并不会在初学中用到(例如globals(), locals(), compile()),这些函数提供了一些Python魔法,但是想正确地使用它们是很难的,所以只需要认识一些常见内置函数就足够了。非常常见的内置函数有print(), input(), open(), int() 等等,在学习的过程中就会掌握足够的内置函数了。

  5. 了解标准库提供的功能,学会常见的标准库使用。

    标准库是一个语言自带的库,Python作为一个自带强大功能的软件,自然也携带了庞大的标准库。这些标准库有的在几十年前非常常用,也有的是新库,选择一些你觉得需要的看就足够了。而选择标准则是这个中文标准库索引中,看介绍你觉得对自己有用的,如果看不懂那就是暂时用不上。

    Python标准库

  6. 认清自己的方向,找到合适的包。

    Python是一个通用编程语言,可以做到非常多的事情。然而术业有专攻,即使是专业的程序员也无法兼顾所有工作。你只需要找到适合你的包即可。

我用过的教程

廖雪峰的Python教程

我也是看他的教程学来的,教程本身非常实用,学完后也能独立地使用Python完成想要的功能,但后期进度稍快,很多东西并没有介绍全,需要自己去其他地方补充。有些东西并不需要学会(比如metaclass),也有些需要慢慢理解(比如装饰器)

菜鸟教程

比起廖大的教程来,菜鸟教程虽然简单,但介绍的东西更细节,可以了解更多Python内置函数,在完成想要的功能时可以更多地借助Python自身提供的功能,简化代码,提高生产力。

Python标准库

平时多看看感兴趣的库,可以学到库的用法,而且可以学会一些专业术语,为搜索提供更恰当的词汇。

如果你想做这些,那么Python并不适合你

尽快找到一份编程工作

去学Java。

首先是市场对Python程序员的需求。Python是最近很火的一门语言。但是请注意,Python的火,并非来源于其在传统领域的大规模应用,而是来自于近年来深度学习工具对其的适配。深度学习工具本体使用C、C++、CUDA编写,并提供了更友好的Python接口,而借着人工智能的热潮,Python成为了近年来热度最高的语言之一。但是研究人工智能需要公司有很大的投入,并且短期内回报很小甚至根本没有,所以小公司很难入局。这也导致了人工智能岗位并不多,同时对应聘者的学历、算法水平具有很高的要求,依靠人工智能应用找到岗位是非常难的。

其次再来说说Python在传统领域的应用。除去像阿里、腾讯这样的覆盖面极广的互联网公司,还有许多依靠互联网来运营业务的公司,而这些公司才是程序员招聘的主力。而对这些公司而言,互联网并不是一个研究领域,而是拓展业务的基础设施。所以他们并不需要人工智能,也不需要新颖的算法和快速更迭的软件版本,他们只需要一个成熟系统的商业解决方案。在这方面,Python是远远不足的,虽然有flask、django这样的框架,但国内针对这些框架开发的工具很少。同时已有的成熟解决方案大量使用Java以及基于Java的Spring框架,这些公司也更可能已经有了一套使用它们的系统,Python很难入局这个方向。

最后就是培训的问题,纵观Python培训的市场,目前主流的培训内容有四个:爬虫、人工智能、数据分析和操作Word、Excel等。而在一些培训网站的官网上则有flask、django的教程。这其中,大多数的培训是在Python近年热度提高以后才出现的,其质量参差不齐,有些更是舍本逐末(参见下文),学会后岗位需求也少。而Java的培训则目标性更强,并且培训机构在该领域耕耘的也更久,质量差的教程基本已经被淘汰。并且教程内容几乎是清一色的JavaEE、Spring框架,同时也有大量成熟的商业级开源项目可供参考,找工作范围也更大。

操作 Word、Excel、PPT内数据

以前我这里写去学VBA,但是没想到现在(修改于2024-09-21)Excel官方开始支持Python了,或许将来内嵌Python指日可待了。

但是VBA仍然占据着存量宏代码的半壁江山,所以学仍然是可以的。

不过如果有以下需求的话,Python确实是更好的选择:

  1. Excel数据来源于网络,需要使用爬虫工具。
  2. 大规模(十万级及以上)数据计算,需要使用numpy、pandas。

游戏开发

去学专业的游戏开发工具,并考虑一下是不是自己还欠缺美工、策划等技能。

有些教程会使用PyGame库进行教学,通过自己实现一个小游戏来激起读者的学习兴趣,同时让学习过程有一个明确的目标,这是一个不错的实践,但是如果想进行正式的游戏开发,那么Python不适合你。首先,Python没有游戏地图、事件等的编辑器,所有的任务都需要依赖手工编写代码完成,这是一个极度拖慢游戏开发进程的做法,在游戏开发中并不可取。相应的,想要开发什么样的游戏,就去找对应的开发工具,比如RPG Maker、Unity、Unreal等工具,不同的工具用于不同的游戏开发场景,你需要根据想开发的游戏选择一个工具并选择学习的语言。

当然,如果你已经熟练使用Python,那么用它开发一个小游戏也未尝不可。但如果你决心要做游戏行业,那趁早转学Unity、UE或者Godot等专业游戏开发工具更好一些。

常见的坑

不要在程序能正确运行出结果前尝试优化程序

有一句话叫做:过早的优化是万恶之源。这句话在软件工程中非常常见。Python本来就不是一门以执行速度快闻名的语言,在尝试优化几微秒,甚至几纳秒的速度前,先把功能正确地完成才是更重要的。同时,在大多数情况下,速度并非瓶颈,此时应该着重代码的可读性优化,让将来你可以看懂自己写了什么,而不是程序的性能。

不要修改Python包中的代码

Python的包,是扁平化的,除了你使用这个包外,其他包也可能使用这个包,如果修改这个包的代码导致了包行为出现错误,很可能会连带导致其他依赖该包的包也出现错误。最好的方式是复制包中的代码到自己的文件中进行修改以达成目的,通常这并不会很难。如果一定要修改,那么新创建一个虚拟环境,在其中修改可以最小化这种行为带来的影响。

不需要学习算法

有些人觉得一定要学算法才是学编程,这必是不可能滴。算法虽然很重要,但不了解算法也不会怎么样,常见的算法Python已经帮你做好了,不必担心算法学习。

不需要了解计算机底层逻辑

都学Python了,你和底层就几乎一点关系都没有了。编程并非是越底层越厉害,如果对计算机有兴趣当然可以研究底层逻辑,但只是学习编程用于完成目的的话我们并不需要了解计算机底层是怎么做的,Python已经帮我们屏蔽了很多复杂且难以理解的底层操作,我们只需要知道它是怎么用的,不用管它是怎么来的。

PEP标准是很好的指导,但不要被其限制

PEP是为了让代码更具可读性,也就是PEP是希望人们更容易读懂Python代码,如果你的功能不遵循PEP标准会更有可读性,那么就不需要遵循。

不要迷信 async/await,最好不要使用

牵扯到网络编程,总有人认为async/await更好,因为它们更快,但是实际情况并非如此。一个重要的原因是:async编程具有传染性。也就是说,如果你使用了async,那么必须所有的操作都使用async才能保证异步操作是通畅的。如果哪里有任何一步被阻塞了(比如调用了requests.get(), time.sleep() 或类似的函数),那么整个协程都会阻塞在此处,其他协程也无法运行,其效率相比多线程陡降。而大多数Python包并没有提供async函数,所以很容易遇到无法解决的性能瓶颈。当然,对于熟练Python程序员来说,可以用多线程或多进程对协程进行包装,但这就失去了使用协程切换速度快的优势,如果不是需要完成基本功能需要使用协程(比如telethon,那么不必追求协程。有关Python的协程,我可能会再写一篇博文。

python协程小记

Python协程中的疑问

Python中的async/await究竟是怎么用的?

有许多教程在介绍中会这么告诉你

1
2
3
4
5
import asyncio
async def demo1():
# 等待5s
await asyncio.sleep(5)
print("睡眠结束")

但是大家可以看到,sleep函数只能使用asyncio包中自带的函数,一旦换成平时使用的time包就会报错。比如下面:

1
2
3
4
5
6
7
import time,asyncio
async def demo2():
# 预期等待5s
await time.sleep(5) #的确是等待了5s
# 报错了:
# TypeError: object NoneType can't be used in 'await' expression
print("这条消息不应该被打印")

并且,实际测试中我们也知道,虽然demo1可以并行,但是demo2根本无法并行,这是为什么呢?

只能用asyncio包里的sleep函数作展示,连个requests包都用不了,这协程到底又有什么用呢?

系统调度与用户程序调度

本来这次我是想先写一些调度方面的科普的,但是调度牵扯到非常复杂的知识,我的积累还不够写出一篇好科普来,所以先写一些简单的表象。

我们知道操作系统负责调度进程与线程,通过操作系统的调度,即使是单核处理器也可以表现得像一个同时进行多任务的电脑,这是普通用户使用电脑的基础。而操作系统进行运行调度的最小单位是线程,所以我们下面仅使用线程进行说明。

线程是由操作系统调度的,用户程序并不能控制操作系统如何调度线程,并且,用户程序也不能得知操作系统何时、如何调度线程。但是,用户程序掌握着从操作系统中获取的部分资源,这些资源可以让操作系统决定调度策略。比如最简单的,如果一个线程调用了sleep()函数,那么操作系统得知这个线程会等待一定的时间,那么在这一段时间内便不会让这个线程运行。此时这个线程便失去了在CPU中执行的权利,我们称其为进入了阻塞状态。当达到预计的睡眠时间后,操作系统会考虑继续运行这个线程(但并不一定立刻运行该线程)。

当然,除了sleep()函数,一些IO有关的函数,比如request.get()也是会通知系统后进入阻塞状态,直到收到第一个TCP数据包为止都会阻塞。但系统调度的不透明,不仅仅表现在用户程序对系统的调度一无所知,系统也不知道用户程序何时会触发调度(原因请参考停机问题),而为了能运行其他程序,会在一定时间或者一些系统内置的条件达到后强行暂停线程的运行,这被称之为挂起

操作系统调度进程是需要消耗时间的,虽然一次时间短到可以忽略,但如果频繁地调度,消耗的时间就会长到无法忽视。而服务器软件常常需要同时处理成千上万个连接,如果采用线程调度的方式,就会有很多时间就会消耗在线程的调度上。同时,也会出现从挂起中恢复后刚运算了十几步,就触发IO进入阻塞这种极端情况。

为了规避这种情况,人们又发明了协程。协程的核心是使用操作系统的非阻塞IO请求,其调度不经过系统,而由程序本身进行。所以一个协程程序只需要有一个线程在运行即可。也因此协程中不允许使用线程睡眠(time.sleep()),不允许使用阻塞的IO请求,也无法利用多核CPU。但是协程的优点就是可以由程序员掌控调度的过程,可以出现一些极端情况。同时程序内部调度不如操作系统调度那般复杂,减少了调度的工作量。

常见问题和解答

相信大家了解到这里,许多问题就会迎刃而解,下面我写一些常见的问题和答案。


Q:为什么协程要有一个入口点?

A:因为你需要将这些协程函数加入程序自己的调度器,没有一个中心调度器的话程序不知道如何运行协程函数。


Q:为什么无法使用传统的网络请求函数?

A:因为传统的网络请求工具使用了阻塞的IO接口,调用它们系统会不再运行这个线程,程序自身协程调度器自然也就没了意义。


Q:asyncio.sleep(n)为什么能实现协程睡眠?

A:这个函数并没有通知系统,而是通知了自己内部的调度器,所以系统不会因此挂起这个线程。


Q:协程的优缺点?

A: 如下。

优点

  1. 由程序员掌控调度时机,调度比操作系统更有效率
  2. 程序的调度器不如操作系统调度器那般复杂,同时不会触发用户态和内核态之间的切换,可以将更多CPU用于执行程序逻辑而不是调度。
  3. 由于调度时机可控,可以省去锁、信号量等工具,无需关心同步问题。

缺点

  1. 具有传染性,也就是如果使用了任何一个协程IO函数,那么所有的其他IO函数都需要换成协程IO才不会阻塞。最后的结果一定是从主函数到其他函数全部是协程函数。
  2. 需要额外的标准库学习成本,需要定制的第三方包。

Q:作者怎么看待Python的协程?

A:协程具有要么不用,要么全用的属性。而现在的Python环境来看,不用是最好的选择。目前协程还是小众的Python产品,配齐全套协程工具并且在需要时随时加入新功能是很难的。就像即使Tornado网络性能占优但大家还是愿意基于Django开发Python后端一样,Python协程还需要继续发展。


Q:那如果我一定要用协程,还要用传统的网络请求函数怎么办?

A:虽然我建议你全部使用多线程来代替,但是如果出于某些原因(比如最实现最基本功能的包使用await),一定要混合也不是没有办法。下面给出一个可行的代码示例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import asyncio
import threading
def awaitable(func):
async def _wrapper(*args, **kwargs):
def _runner():
try:
result = func(*args, **kwargs)
# 不能直接使用 future.set_result(result)
# 很大概率会导致无法继续运行!下同。
loop.call_soon_threadsafe(future.set_result, result)
except Exception as e:
loop.call_soon_threadsafe(future.set_exception, e)

loop = asyncio.get_running_loop()
future = loop.create_future()
# 创建一个线程包装器来执行。
t = threading.Thread(target=_runner)
# 启动线程,等待future。
t.start()
return await future

return _wrapper

这是一个装饰器,可以装饰在任何你想要的函数上,装饰上之后就可以使用await调用了。这里举个实际使用的例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
@awaitable
def requests_get(url):
return requests.get(url)


async def main():
resp = await requests_get("https://www.baidu.com")
print(resp.text)


if __name__ == '__main__':
asyncio.run(main())

当然有经验的读者一定能看出,这不过是一个多线程await包装器而已,实际上还是多线程,并没有协程的优势,但这种方式的确能暂时规避这个痛点。所以我希望大家尽量不要使用该方法,直接用多线程就好了,连Java十万并发都直接靠系统线程调度呢,在Python这性能真的不是很重要的。


Q:除了Python还有其他支持协程的编程语言吗?

A:当然有,多的是。常见的语言级支持比如C#、Rust、Kotlin、JavaScript。同时C也可以通过外部包等方式进行协程开发,C++也计划于C++20中支持协程。

  • Copyrights © 2019-2025 Ytyan

请我喝杯咖啡吧~