Pipeline Style Map-Reduce in Python

Since C++20, C++ provide a new style of data processing, and the ability of lazy evaluation by chaining the iterator to another iterator.

1
2
3
4
5
6
7
8
9
#include <ranges>
#include <vector>

int main() {
std::vector<int> input = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
auto output = input | std::views::filter([](const int n) {return n % 3 == 0;})
| std::views::transform([](const int n) {return n * n;});
// now output is [0, 9, 36, 81], conceptually
}

Can we use this style in Python? Yes! :D

Evaluation of Operators

In Python, all expression evolves a arithmetic operator, e.g. a + b, is evaluate by follow rule

  • If the (forward) special method, e.g. __add__ exists on left operand
    • It’s invoked on left operand, e.g. a.__add__(b)
    • If the invocation return some meaningful value other than NotImplemented, done!
  • If the (forward) special method does not exist, or the invocation returns NotImplemented, then
  • If the reverse special method, e.g. __radd__ exists on right operand
    • It’s invoked on the right operator, e.g. b.__radd__(a)
    • If the invocation return some meaningful value other than NotImplemented, done!
  • Otherwise, TypeError is raised

So it seems possible here… Let’s make a quick experiment

1
2
3
4
5
6
7
8
9
class Adder:
def __init__(self, rhs: int) -> None:
self._rhs = rhs

def __ror__(self, lhs: int) -> int:
return lhs + self._rhs


assert 5 == 2 | Adder(3) # Is 5 equals to 2 + 3 ? Yes!!

This works because the | operator of integer 2 check the type of Adder(3) and found that is not something it recognized, so it returns NotImplemented and our reverse magic method goes.

In C++, the | operator is overloaded(?) on range adaptors to accept ranges. So maybe we can make something similar, having some object implements __ror__ that accept an iterable and return another value (probably a iterator).

Pipe-able Higher Order Function

So back to our motivation, Python already have something like filter map reduce, and also the powerful generator expression to filter and/or map without explicit function call.

1
values = filter(lambda v: v % 2 == 0, range(10))
1
values = (v for v in range(10) if v % 2 == 0)

But it’s just hard to chain multiple operations together while preserving readability.

So let’s make a filter object that support | operator

1
2
3
4
5
6
7
8
9
10
11
12
class Filter:
def __init__(self, predicate: Callable[[int], bool]) -> None:
self._predicate = predicate

def __ror__(self, values: Iterable[int]) -> Iterator[int]:
for value in values:
if self._predicate(value):
yield value


selected = range(10) | Filter(lambda val: val % 2 == 0)
assert [0, 2, 4, 6, 8] == list(selected)

How about map?

1
2
3
4
5
6
7
8
9
10
11
class Mapper:
def __init__(self, transform: Callable[[int], int]) -> None:
self._transform = transform

def __ror__(self, values: Iterable[int]) -> Iterator[int]:
for value in values:
yield self._transform(value)


processed = range(3) | Mapper(lambda val: val * 2)
assert [0, 2, 4] == list(processed)

Works well, we are great again!!

It just take some time for we to write the class representation for filter, map, reduce, take, any … and any higher function you may think useful.

Wait, it looks so tedious. Python should be a powerful language, isn’t it?

Piper and Decorators

The function capturing and __ror__ implementation can be so annoying for all high order function. If we can make sure __ror__ only take left operand, and return the return value of the captured function, than we can extract a common Piper class. We just need another function to produce a function that already capture the required logic.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Piper(Generic[_T, _U]):
def __init__(self, func: Callable[[_T], _U]) -> None:
self._func = func

def __ror__(self, lhs: _T) -> _U:
return self._func(lhs)


def filter_wrapped(predicate: Callable[[_T], bool]):
def apply(items: Iterable[_T]) -> Iterator[_T]:
for item in items:
if predicate(item):
yield item

return Piper(apply)


selected = range(10) | filter_wrapped(lambda val: val % 2 == 0)
assert [0, 2, 4, 6, 8] == list(selected)

Now it looks a little nicer … but we still need to implement all wrapper functions for all kinds of operations?

Again, the only difference between these wrapped functions is the logic inside apply function, so we can extract this part again, with a decorator!! :D

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def on(func: Callable[Concatenate[_T, _P], _R]) -> Callable[_P, Piper[_T, _R]]:
def wrapped(*args: _P.args, **kwargs: _P.kwargs) -> Piper[_T, _R]:
def apply(head: _T) -> _R:
return func(head, *args, **kwargs)

return Piper(apply)

return wrapped


@on
def filter(items: Iterable[_T], predicate: Callable[[_T], bool]) -> Iterator[_T]:
for item in items:
if predicate(item):
yield item


selected = range(10) | filter(lambda val: val % 2 == 0)
assert [0, 2, 4, 6, 8] == list(selected)

The on decorator accept some function func, and return a function that first take the tail arguments of func and return a function that accept head argument of func through pipe operator.

So now we can express our thoughts in our codebase using pipeline style code, just with one helper class and one helper decorator! :D

1
2
3
4
5
6
7
8
values = range(10)
result = (
values
| filter(lambda val: val % 2 == 0)
| map(str)
| on(lambda chunks: "".join(chunks))() # create pipe-able object on the fly
)
assert result == "02468"

or

1
2
for val in range(10) | filter(lambda val: val % 2 == 0):
print(val)

Appendix

Complete type-safe code here

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
"""
pipe is a module that make it easy to write higher-order pipeline function
"""

from collections.abc import Callable, Iterable, Iterator
from typing import Generic, TypeVar, ParamSpec, Concatenate

_R = TypeVar("_R")
_T = TypeVar("_T")
_P = ParamSpec("_P")


class Piper(Generic[_T, _R]):
"""
Piper[T, R] is a function that accept T and return R

call the piper with "value_t | piper_t_r"
"""

def __init__(self, func: Callable[[_T], _R]) -> None:
self._func = func

def __ror__(self, items: _T) -> _R:
return self._func(items)


def on(func: Callable[Concatenate[_T, _P], _R]) -> Callable[_P, Piper[_T, _R]]:
"""
"on" decorates a func into pipe-style function.

The result function first takes the arguments, excluding first,
and returns an object that takes the first argument through "|" operator.
"""

def wrapped(*args: _P.args, **kwargs: _P.kwargs) -> Piper[_T, _R]:
def apply(head: _T) -> _R:
return func(head, *args, **kwargs)

return Piper(apply)

return wrapped

Introduction of Function Hijacking in C

Thanks to the symbol-lazy-loading ability in Unix environment, we can do many interesting thing on functions from some shared library when executing some executables.

All we need to do are

  • Implement a shared library that contains the functions we want to hijack.
  • Run the executable with our magic library inserted.

Make a Shared Library

If we want to replace some function with stub / fake implementation. we can just implement a function with the same name and the same signature.

For example, if we want to fixed the clock during unit test …

1
2
3
4
5
6
7
8
9
10
11
// in hijack.c

#include <time.h>

// a "time" function which always return the timestamp of the epoch.
time_t time(time_t* arg) {
if (arg)
*arg = 0;

return 0;
}

If we want do observation about some function call, but still delegate the call to the original function, we can implement a function that load corresponding function at runtime and pass the function call.

For example, if we want to monitor the call sequence of file open action.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// in hijack.c

#include <dlfcn.h>
#include <stdio.h>
#include <time.h>

// write "open" action to standard error before open the file
FILE* fopen(const char* restrict path, const char* restrict mode) {
static int used_count = 0;
used_count += 1;
fprintf(stderr, "open file [%d]: \"%s\"\n", used_count, path);

typedef FILE* (*wrapped_type)(const char* restrict path, const char* restrict mode);
// no dlopen, just search the function in magic handle RTLD_NEXT
wrapped_type wrapped = dlsym(RTLD_NEXT, "fopen");
return wrapped(path, mode);
}

After finish our implementation, compile them as a shared library, called hijack.so here.

1
cc -fPIC -shared -o hijack.so hijack.c

Hijack during Actual Execution

We can use LD_PRELOAD environment variable to do insert our special shared library for any executable during execution.

1
LD_PRELOAD="path-to-shared-lib" executable

For example, if we want to use the implementations in last section in our executable, called app here.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// app.c

#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>

int main() {
FILE* handle = fopen("output.txt", "a");
assert(handle);
fclose(handle);

time_t current = time(NULL);
printf("now: %s", asctime(gmtime(&current)));

return EXIT_SUCCESS;
}

(Compile and) run the executable

1
2
cc -o app app.c
LD_PRELOAD=./hijack.so ./app

Output

1
2
open file [1]: "output.txt"
now: Thu Jan 1 00:00:00 1970

The open-file action is traced, and the time is fixed to the epoch.

If we need to overwrite functions with more than one shared library, just use : to separate them in LD_PRELOAD.

Conclusion

It’s a powerful feature, it allows we to do observation, to replace with mock/fake implementation, or sometime even to apply emergency patch.

And all make this possible are the dynamic linking mechanism, and the one-to-one mapping from symbol from sources to libraries/binaries.

Although development in C is somehow inconvenient, but it’s still a interesting experience when seeing this kind of usage. :D

VS Code 新功能: Remote Repositories

VS Code 在 1.57 版中, Remote Development 系列 extension 加入了新成員: Remote Repositories

有了這個 extension 之後,如果遇上臨時想看的 project,就可以直接在 VS Code 中叫出來看,不需要事先 clone 至某個 local 資料夾。

不過.. 因為這個 extension 實際上是建一個 Virtual Workspaces 並把 code 放在裡面閱覽, 所以用 Remote Repositories 開出來的 workspace 功能非常受限。 諸如 Debug, Terminal 及大部分的 extension 基本上都不能用。 但話雖如此,當看 code 看一看想要開始進行比較深入的修改及除錯時, 其實也是有提供轉換成一般 workspace 的功能。 使用上非常方便!

可惜的是,目前此 extension 支援的 remote repository 種類只有 GitHub。 且如同其他 Remote Development Series,這個 extension 並非 open source project:

未來會不會支援 GitHub 以外的 Git repositories,甚至其他種類的 VCS, 只能看微軟爸爸的眼色了。