Pipeline Style Map-Reduce in Python

Since C++20, C++ provide a new style of data processing, and the ability of lazy evaluation by chaining the iterator to another iterator.

1
2
3
4
5
6
7
8
9
#include <ranges>
#include <vector>

int main() {
std::vector<int> input = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
auto output = input | std::views::filter([](const int n) {return n % 3 == 0;})
| std::views::transform([](const int n) {return n * n;});
// now output is [0, 9, 36, 81], conceptually
}

Can we use this style in Python? Yes! :D

Evaluation of Operators

In Python, all expression evolves a arithmetic operator, e.g. a + b, is evaluate by follow rule

  • If the (forward) special method, e.g. __add__ exists on left operand
    • It’s invoked on left operand, e.g. a.__add__(b)
    • If the invocation return some meaningful value other than NotImplemented, done!
  • If the (forward) special method does not exist, or the invocation returns NotImplemented, then
  • If the reverse special method, e.g. __radd__ exists on right operand
    • It’s invoked on the right operator, e.g. b.__radd__(a)
    • If the invocation return some meaningful value other than NotImplemented, done!
  • Otherwise, TypeError is raised

So it seems possible here… Let’s make a quick experiment

1
2
3
4
5
6
7
8
9
class Adder:
def __init__(self, rhs: int) -> None:
self._rhs = rhs

def __ror__(self, lhs: int) -> int:
return lhs + self._rhs


assert 5 == 2 | Adder(3) # Is 5 equals to 2 + 3 ? Yes!!

This works because the | operator of integer 2 check the type of Adder(3) and found that is not something it recognized, so it returns NotImplemented and our reverse magic method goes.

In C++, the | operator is overloaded(?) on range adaptors to accept ranges. So maybe we can make something similar, having some object implements __ror__ that accept an iterable and return another value (probably a iterator).

Pipe-able Higher Order Function

So back to our motivation, Python already have something like filter map reduce, and also the powerful generator expression to filter and/or map without explicit function call.

1
values = filter(lambda v: v % 2 == 0, range(10))
1
values = (v for v in range(10) if v % 2 == 0)

But it’s just hard to chain multiple operations together while preserving readability.

So let’s make a filter object that support | operator

1
2
3
4
5
6
7
8
9
10
11
12
class Filter:
def __init__(self, predicate: Callable[[int], bool]) -> None:
self._predicate = predicate

def __ror__(self, values: Iterable[int]) -> Iterator[int]:
for value in values:
if self._predicate(value):
yield value


selected = range(10) | Filter(lambda val: val % 2 == 0)
assert [0, 2, 4, 6, 8] == list(selected)

How about map?

1
2
3
4
5
6
7
8
9
10
11
class Mapper:
def __init__(self, transform: Callable[[int], int]) -> None:
self._transform = transform

def __ror__(self, values: Iterable[int]) -> Iterator[int]:
for value in values:
yield self._transform(value)


processed = range(3) | Mapper(lambda val: val * 2)
assert [0, 2, 4] == list(processed)

Works well, we are great again!!

It just take some time for we to write the class representation for filter, map, reduce, take, any … and any higher function you may think useful.

Wait, it looks so tedious. Python should be a powerful language, isn’t it?

Piper and Decorators

The function capturing and __ror__ implementation can be so annoying for all high order function. If we can make sure __ror__ only take left operand, and return the return value of the captured function, than we can extract a common Piper class. We just need another function to produce a function that already capture the required logic.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Piper(Generic[_T, _U]):
def __init__(self, func: Callable[[_T], _U]) -> None:
self._func = func

def __ror__(self, lhs: _T) -> _U:
return self._func(lhs)


def filter_wrapped(predicate: Callable[[_T], bool]):
def apply(items: Iterable[_T]) -> Iterator[_T]:
for item in items:
if predicate(item):
yield item

return Piper(apply)


selected = range(10) | filter_wrapped(lambda val: val % 2 == 0)
assert [0, 2, 4, 6, 8] == list(selected)

Now it looks a little nicer … but we still need to implement all wrapper functions for all kinds of operations?

Again, the only difference between these wrapped functions is the logic inside apply function, so we can extract this part again, with a decorator!! :D

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def on(func: Callable[Concatenate[_T, _P], _R]) -> Callable[_P, Piper[_T, _R]]:
def wrapped(*args: _P.args, **kwargs: _P.kwargs) -> Piper[_T, _R]:
def apply(head: _T) -> _R:
return func(head, *args, **kwargs)

return Piper(apply)

return wrapped


@on
def filter(items: Iterable[_T], predicate: Callable[[_T], bool]) -> Iterator[_T]:
for item in items:
if predicate(item):
yield item


selected = range(10) | filter(lambda val: val % 2 == 0)
assert [0, 2, 4, 6, 8] == list(selected)

The on decorator accept some function func, and return a function that first take the tail arguments of func and return a function that accept head argument of func through pipe operator.

So now we can express our thoughts in our codebase using pipeline style code, just with one helper class and one helper decorator! :D

1
2
3
4
5
6
7
8
values = range(10)
result = (
values
| filter(lambda val: val % 2 == 0)
| map(str)
| on(lambda chunks: "".join(chunks))() # create pipe-able object on the fly
)
assert result == "02468"

or

1
2
for val in range(10) | filter(lambda val: val % 2 == 0):
print(val)

Appendix

Complete type-safe code here

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
"""
pipe is a module that make it easy to write higher-order pipeline function
"""

from collections.abc import Callable, Iterable, Iterator
from typing import Generic, TypeVar, ParamSpec, Concatenate

_R = TypeVar("_R")
_T = TypeVar("_T")
_P = ParamSpec("_P")


class Piper(Generic[_T, _R]):
"""
Piper[T, R] is a function that accept T and return R

call the piper with "value_t | piper_t_r"
"""

def __init__(self, func: Callable[[_T], _R]) -> None:
self._func = func

def __ror__(self, items: _T) -> _R:
return self._func(items)


def on(func: Callable[Concatenate[_T, _P], _R]) -> Callable[_P, Piper[_T, _R]]:
"""
"on" decorates a func into pipe-style function.

The result function first takes the arguments, excluding first,
and returns an object that takes the first argument through "|" operator.
"""

def wrapped(*args: _P.args, **kwargs: _P.kwargs) -> Piper[_T, _R]:
def apply(head: _T) -> _R:
return func(head, *args, **kwargs)

return Piper(apply)

return wrapped

Introduction of Function Hijacking in C

Thanks to the symbol-lazy-loading ability in Unix environment, we can do many interesting thing on functions from some shared library when executing some executables.

All we need to do are

  • Implement a shared library that contains the functions we want to hijack.
  • Run the executable with our magic library inserted.

Make a Shared Library

If we want to replace some function with stub / fake implementation. we can just implement a function with the same name and the same signature.

For example, if we want to fixed the clock during unit test …

1
2
3
4
5
6
7
8
9
10
11
// in hijack.c

#include <time.h>

// a "time" function which always return the timestamp of the epoch.
time_t time(time_t* arg) {
if (arg)
*arg = 0;

return 0;
}

If we want do observation about some function call, but still delegate the call to the original function, we can implement a function that load corresponding function at runtime and pass the function call.

For example, if we want to monitor the call sequence of file open action.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// in hijack.c

#include <dlfcn.h>
#include <stdio.h>
#include <time.h>

// write "open" action to standard error before open the file
FILE* fopen(const char* restrict path, const char* restrict mode) {
static int used_count = 0;
used_count += 1;
fprintf(stderr, "open file [%d]: \"%s\"\n", used_count, path);

typedef FILE* (*wrapped_type)(const char* restrict path, const char* restrict mode);
// no dlopen, just search the function in magic handle RTLD_NEXT
wrapped_type wrapped = dlsym(RTLD_NEXT, "fopen");
return wrapped(path, mode);
}

After finish our implementation, compile them as a shared library, called hijack.so here.

1
cc -fPIC -shared -o hijack.so hijack.c

Hijack during Actual Execution

We can use LD_PRELOAD environment variable to do insert our special shared library for any executable during execution.

1
LD_PRELOAD="path-to-shared-lib" executable

For example, if we want to use the implementations in last section in our executable, called app here.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// app.c

#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>

int main() {
FILE* handle = fopen("output.txt", "a");
assert(handle);
fclose(handle);

time_t current = time(NULL);
printf("now: %s", asctime(gmtime(&current)));

return EXIT_SUCCESS;
}

(Compile and) run the executable

1
2
cc -o app app.c
LD_PRELOAD=./hijack.so ./app

Output

1
2
open file [1]: "output.txt"
now: Thu Jan 1 00:00:00 1970

The open-file action is traced, and the time is fixed to the epoch.

If we need to overwrite functions with more than one shared library, just use : to separate them in LD_PRELOAD.

Conclusion

It’s a powerful feature, it allows we to do observation, to replace with mock/fake implementation, or sometime even to apply emergency patch.

And all make this possible are the dynamic linking mechanism, and the one-to-one mapping from symbol from sources to libraries/binaries.

Although development in C is somehow inconvenient, but it’s still a interesting experience when seeing this kind of usage. :D

利用 SSH 建立 SOCKS Proxy

最近因為疫情又開始 WFH 了。 公司有提供一些 VPN solution 讓員工存取公司內網路, 但有一些架在 public cloud 上的服務後台因為有擋來源 IP,無法在家直接存取。

這時候 SSH 內建的 SOCKS proxy server 功能就可以派上用場了!

SSH 可以在建立連線時,一併在本機端開出一個 SOCKS (version 4 and 5) 的 server, 接下來任何應用程式都可以將任意的 TCP 連線透過這個 SOCKS server,轉送到 SSH server 後再與目標站台連線。 因為大家一定在公司裡有台可以 SSH 的機器(?),於是這種限制公司 IP 的管理後台就可以順利存取。 :D

使用方式很簡單,SSH 連線時多下參數即可。

1
ssh "target-machine" -D "localhost:1080" -N
  • -D localhost:1080: 決定要開在 local 的 SOCKS port,RFC 建議是 1080
  • -N: 如果不需要開一個 shell,只是要 SOCKS proxy 功能,那可以多帶此參數

Note: SSH 有支援 SOCKS5 (可做 IPv6 proxy) 但不支援 authentication,不過因為 SOCKS server 可以如上述設定只開在 localhost 上,所以沒麼問題。

接著我們就可以設定 OS 層級或是 application 層級的 proxy 設定來使用這個 proxy 了! 以我一開始遇到的問題來說,通常我會多開一個 Firefox 並設定使用 proxy 來存取公司的各種管理後台。 這樣就可以保持其他網路流量還是直接往外打,不需要過 proxy。 :D

若要快速啟動 proxy,可以使用 Windows Terminal 並設定一個 profile,執行上述 SSH 指令。

PuTTY 作為 Windows 上最多人使用的 SSH client,也有支援 SOCKS proxy 功能, 詳見: How To Set up a SOCKS Proxy Using Putty & SSH - Security Musings

Reference

如何避免 Commit Message 拼錯字?

文件打錯字還好,隨時可以修。 但在 commit message 中打錯字,可是會流傳千古。

身為一個 RD,有個極簡的解決方法.. 把底下這行設定放到 .vimrc 內即可。 (O

1
autocmd FileType gitcommit setlocal spell

(對於屬 Git commit message 的 buffer 自動啟用 spell check 功能)

設定完之後,當出現 vim 不認識的單字時,就會有醒目的顏色提示, 提醒自己該回頭看一下是不是又拼錯字了。

當然,要有舒適的拼字檢查體驗,字典檔的維護也是很重要的一環。 不過那又是另一個話題了..

Reference