w3ctech

[译]CSP and transducers in JavaScript

原文见:http://phuu.net/2014/08/31/csp-and-transducers.html

CSP和JavaScript的transducer(直译是传感器,但是其实是transform和reducer的合成词,transform是变换,reducer就是JS中的reducer,也就是说transducer的意思是把变换和过滤结合起来)

我在学习 Clojure 的过程中产生了许多有趣的想法。我认为 尝试新东西 真的很重要 ,因此在这篇文章中我试图结合两个东西——一个新一个旧:C.霍尔的 通信顺序进程 (CSP)中提到的 Clojure/core.async ,和Rich Hickey介绍过的 transducer ,后者是Clojure中的新特性。

写这篇文章有两个目的:通过讲述来巩固我对于这些想法的理解并——顺便——帮助其他人理解它们。为了防止出错,我会尽量讲清楚每一个概念,尤其是transducer。这篇文章很长,我希望它也能很有用。

tl;dr —— 文中出现的代码都托管在GitHub上

首先我们需要来讲解一些基本的概念......

CSP?

CSP 是一种描述并发系统通信的正规方式。这个解释听起来似乎很无聊,确实如此——但是就像其他非常无聊的概念一样,一旦应用在现实世界中就会产生许多有趣的东西。有点像酸奶(作者的意思应该是把无聊的概念应用到现实世界中就像牛奶发酵成酸奶一样,会产生非常神奇的变化——译者注)。

core.async?

一年前,CSP的具体实现 core.async 出现在Clojure社区,能够实现“异步编程和通讯”。CSP提出了 通道 这个概念,使用通道可以非常 简单的 调度 系统中实体 。这个库是编译无关的,因此也可以在 ClojureScript 中使用。

Transducer?

在这个史诗级领域中(几乎跨越 40年 计算历史!) ,最新的进展就是 transducer ,一种“强大且可组合的算法变换方式”。这个解释同样很无聊,但是在实际使用中非常强大——而且对我来说很难理解!

我对 如何结合这些概念 很感兴趣,并且我一直在用 ClojureScript大卫·诺伦 开发的优秀的 Om 框架来 进行各种尝试 。此外,这些想法与Twitter的 Flight框架 关系密切,后者正是我的工作。

不过,我一直没有完全理解底层的原理, 亲自去做 是学习的最好办法,因此我一直在不断尝试!

哦,说明一下——我不会花太多时间解释 为什么 你需要这个东西,你可以在上面的很多链接中找到答案。

不要使用事件

可以把通道、transducer和许多东西结合起来,但是这里我只强调一点: 在数据流中不要使用事件 。事件会让你的代码中包含许多可变状态,并且通过事件传递数据并不是最佳实践:

pubsub.on('users:response', function (users) {
    users
        .filter(function (user) {
            return !user.muted;
        })
        .forEach(function (user) {
            pubsub.emit('posts:request', {
                user: user.id
            });
        })
    })
});

pubsub.on('posts:response', function (data) {
    ...
});

pubsub.emit('users:request');

的确,事件很适合一次性的通知,但是如果你想调度大量数据源就会出问题。此外,事件处理程序往往不能复用或者组合。

core.async是一种可选的数据流处理方式,它非常优秀,可以重用和组合。

大卫诺伦的文章会 告诉你原因

JavaScript中的通道

首先我们来实现core.async通道以及通道相关的基础操作: puttake

通道非常简单:它们允许生产者和消费者向通道中 puttake 数据。通道的默认行为是“后进先出” —— take 会从通道中返回最后一次 put 的值,你必须显式 take 来获取下一个值。通道很像队列。

显然,这种模式可以解耦生产者和消费者——它们只需要知道目标通道就够了。此外,通道是多对多的:支持多个生产者 put ,多个消费者 take

这里不会介绍 具体的实现 ,不过简单来说,创建一个通道只需要 var c = chan()

你可以在这个JS Bin中尝试一下使用通道:

不知道该怎么用?可以尝试下面的命令:

> c = chan()
...
> chan.put(c, 10)
...
> chan.take(c, console.log.bind(console, 'got: '))
got: 10
...
> chan.take(c, console.log.bind(console, 'got: '))
...
> chan.put(c, 20)
got: 20
...

很好。我 添加了一些代码 ,不过基本的思路没变。

顺便说一句......上面提到的概念都来源于函数式编程,因此我会尽量避免在对象中定义方法,而是定义可以操作简单数据结构的函数。

现在我们有可以正常工作的通道了!

JS中的Transducer

上面说过,传感器是一种“强大且可组合的算法变换方式”。虽然听起来很厉害,但是这句话无法传达给我们更多的信息。Rich Hickey的 文章 中有详细介绍,不过我还是觉得很难理解。

事实上,理解这个概念可能需要你花费几个小时。

对我来说,Transducer是一种通用并且 可组合 的处理值集合的方式,可以产生一个新值或新的值集合。单词‘transducer’可以分割成两部分:‘transform’——把某个值变换成其他值——和‘reducer’——组合一个数据结构中的值并生成一个新值。

我会通过一个具体的例子来讲解transducer,我们会逐渐把它变得更加通用。

我们的目标是找到一种 “可组合的实现算法变换的方式”

我希望你对此很感兴趣。

由下而上...

首先,我们必须认识到,数组(或其他类型的集合)上的许多操作比如 mapfilterreverse 都可以用 reduce 实现。

下面的例子会将数组中的所有值加1:

[1,2,3,4].map(function (input) {
    return input + 1;
}) // => [2,3,4,5]

非常简单是吧。有两点需要注意:

  • 返回值是从一个新的空数组开始构建的。
  • 每个返回值都会被加入新数组的尾部,你可以手动调用JavaScript的 concat 来实现。

基于以上两点,我们可以使用 .reduce 来重写这个例子:

[1,2,3,4].reduce(function (result, input) {
    return concat(result, input + 1);
}, []) // => [2,3,4,5]

考虑到JavaScript中Array的 concat 不太好用,我重新定义了一个名为 concat 的函数,这个函数可以把一个值加入一个数组:

function concat(a, b) {
    return a.concat([b]);
}

顺便说一句,我们即将进入高阶函数领域。如果你有任何不适,请 阅读一些相关材料

上面这个使用reduce实现map功能的例子不太通用,我们可以封装一层函数:

function mapWithIncr(collection) {
    return collection.reduce(function (result, input) {
        return concat(result, input + 1);
    }, []);
}

mapWithIncrement([1,2,3,4]) // => [2,3,4,5]

更进一步,我们可以传入一个变换函数。本例中这个变换函数是 inc

更进一步,我们可以传入一个变换函数。本例中这个变换函数是 inc :

如果要把这个函数应用在任何集合上,就需要另一个高阶函数 map

事情终于开始变得有趣起来了:这个函数包含了 map本质 ——我们通过 变换连接 值把一个集合 reduce 成了另一个集合。

如果要把这个函数应用在任何集合上,就需要另一个高阶函数 map 。

事情终于开始变得有趣起来了:这个函数包含了 map 的 本质 ——我们通过变换 和 连接 值把一个集合 reduce 成了另一个集合。

这个函数的使用方法如下:

map(inc, [1,2,3,4]) // => [2,3,4,5]

非常好。

算法变换

那么,下一步如何抽象?是时候重申一下我们的目标了:“可组合的实现算法变换的方式”。这里有两个关键短语:“算法变换”和“可组合”。我们会依次进行叙述。

上面定义的 map 是一种算法变换。另一种是我之前提到的 filter ,下面我们来看看如何用相同的方式定义它。

实际上Filter更符合单词”reduce“的意思,因为它生成的输出确实会比输入少(也有可能数量不变,比如全部保留——译者注)。

我们会通过一个具体的例子来看看如何用reduce实现通用的filter函数,从中我们可以理解filter的 本质 是什么。

// 基础过滤器
[1,2,3,4].filter(function (input) {
    return (input > 2);
}) // => [3,4]

// 用reduce实现的过滤器
[1,2,3,4].reduce(function (result, input) {
    return (
        input > 2 ?
            concat(result, input) :
            result
    );
}, []) // => [3,4]

// 变换 (也称作预测)
function greaterThanTwo(x) {
    return (x > 2);
}

// 最后的过滤函数
function filter(predicate, collection) {
    return collection.reduce(function (result, input) {
        return (
            predicate(input) ?
                concat(result, input) :
                result
        );
    }, [])
}

使用方法如下:

filter(greaterThanTwo, [1,2,3,4]) // => [3,4]

可组合

现在,我们已经可以构建不同的算法变换,但是我们还没有实现“可组合性”。下面就来解决这个问题。

如何让我们已经定义好算法变换 mapfilter 具备可组合性?有两种方式:

  • 先在整个集合上应用第一个变换,然后应用第二个。
  • 先在第一个元素上应用所有变换,然后处理第二个元素。

我们已经可以实现第一种:

filter(greaterThanTwo, map(inc, [1,2,3,4])) // => [3,4,5]

我们甚至可以使用 compose

var incrementAndFilter = compose(
    filter.bind(null, greaterThanTwo),
    map.bind(null, inc)
);

incrementAndFilter([1,2,3,4]) // => [3,4,5]

compose 函数可以组合多个函数:

compose(f, g)(10) === f(g(10));

然而,这样做有一些问题:

  • 不能实现并行化。
  • 不能实现懒加载。
  • 具体的操作和输入输出数据的结构关联紧密。

第二种组合方式可以很好地解决上述问题,因此更符合我们的要求。

如果想知道第二种方式为什么能够解决这些问题,请阅读 fork-join模型

坦率地说,我发现第二种方式很难实现,我无法理解如何实现通用的组合方式。

是时候深入研究reducing函数了。

Reducing函数

reducing函数就是可以当作参数传入 reduce 的函数。它们有如下形式: (something, input) -> something 。它们是 mapfilter 例子中最内层的函数。

我们的目标就是让这些函数可以组合,但现在它们都隐藏在 mapfilter 中。

function map(transform, collection) {
    return collection.reduce(
        // Reducing function!
        function (result, input) {
            return concat(result, transform(input));
        },
        []
    );
}

function filter(predicate, collection) {
    return collection.reduce(
        // Reducing function!
        function (result, input) {
            return (
                predicate(input) ?
                    concat(result, input) :
                    result
            );
        },
        []
    )
}

为了提取reducing函数,我们需要让map和filter更通用,具体的方法是提取出两者的公共部分:

  • 都使用collection.reduce
  • ‘种子’值都是一个空数组
  • 都会在 结果输入 上执行 concat 操作( 输入可能会被 变换

首先,我们来提取出 collection.reduce 和种子值。我们会 生成reducing函数 并把它们传入 .reduce

function mapper(transform) {
    return function (result, input) {
        return concat(result, transform(input));
    };
}

function filterer(predicate) {
    return function (result, input) {
        return (
            predicate(input) ?
                concat(result, input) :
                result
        );
    };
}

[1,2,3,4].reduce(mapper(inc), []) // => [2,3,4,5]
[1,2,3,4].reduce(filterer(greaterThanTwo), []) // => [3,4]

很好!我们离成功越来越近了,但是还是不能组合多个reducing函数。最后一块公共功能是关键:在 resultinput 上执行 concat 操作。

前面说过,reducing函数的形式是 (something, input) -> something 。concat就是一种reducing函数:

function concat(a, b) {
    return a.concat([b]);
}

也就是说实际上有两个reducing函数:

  • 一个定义任务(映射,过滤,翻转...)
  • 另一个在任务中合并现有 resultinput

到目前为止,对于后者我们只是使用了 concat ,但是没人规定我们必须这么做。我们是否可以用 另一种完全不同的reducing函数 ——比如说由 mapper 生成的函数?

是的,我们可以这么做。

为了组合reducing函数,我们会从一个简单的例子开始,重写 filterer 函数,让它使用 mapper 来合并 输入result结果 并搞清楚数据是如何流动的。

首先我们需要一个新函数: identity 。它只是返回输入的值:

function identity(x) {
    return x;
}

[1,2,3,4].reduce(mapper(identity), []) // => [1,2,3,4]

我们很容易就能使用 mapper 来改写filter:

function lessThanThree(x) {
    return (x < 3);
}

function mapper(transform) {
    return function (result, input) {
        return concat(result, transform(input));
    };
}

function filterer(predicate) {
    return function (result, input) {
        return (
            predicate(input) ?
                mapper(identity)(result, input) :
                result
        );
    };
}

[1,2,3,4].reduce(filterer(lessThanThree), []) // => [1,2]

下面是这段代码的工作原理:

  1. filterer(lessThanThree) 生成一个reducing函数并传入 .reduce
  2. reducing函数接收 当前结果 ——现在是 [] ——和第一个 输入 —— 1
  3. 调用 predicate ,返回 true ,所以对三元表达式的第一个表达式求值。
  4. mapper(identity) 返回reducing函数,然后用 []1 调用它。
    1. reducing函数的 变换函数transform —— identity ——被调用时,返回输入的值。
    2. 输入的值被 concat 连接到 result 上并返回。
  5. 新的结果——现在是 [1] ——被返回, reduce 会继续循环。

我建议你自己运行一下这段代码!

那么这个例子告诉我们什么?现在一个reducing函数可以使用另一个reducing函数——不是必须用concat!

事实上,如果我们用 mapper(inc) 来改写 filterer 会得到:

[1,2,3,4].reduce(filterer(lessThanThree), []) // => [2,3]

这有点像可组合的算法变换了,但我们不希望手动编写组合函数——我们想直接使用 compose

如果我们提取出内层reducing函数(合成器),就可以让reducing函数不再依赖参数的具体组合方法,而是专注于它的本职工作。

改写之后的结果如下:

function mapping(transform) {
    return function (reduce) {
        return function (result, input) {
            return reduce(result, transform(input));
        };
    };
}

function filtering(predicate) {
    return function (reduce) {
        return function (result, input) {
            return (
                predicate(input) ?
                    reduce(result, input) :
                    result
            );
        };
    };
}

这些新的内层函数——使用 reduce 的函数——就是 transducer 。它们可以在不知道 结果 数据结构的情况下封装一些reducing行为。

实际上,我们已经把合并 结果 和变换后 输入 的职责从reducing函数内部转移给了transducer的 使用者 。这意味着我们可以reduce任何数据结构!

我们看看如何用transducer来实现之前的过滤和加一:

var filterLessThanThreeAndIncrement = compose(
    filtering(lessThanThree),
    mapping(inc)
);

[1,2,3,4].reduce(filterLessThanThreeAndIncrement(concat), []) // => [2,3]

哇哦。注意:

  • 使用transducer的时候,我们只需要指定一次种子数据结构。
  • 我们只需要告诉transducer如何合并 输入结果 一次(本例中是使用 concat ),具体方法是把它传入 filterLessThanThreeAndIncrement 这个transducer。

为了证明transducer可以正常工作,我们可以在 不改变reducering函数 的情况下把结果中的值当作一个对象的键。

[1,2,3,4].reduce(filterLessThanThreeAndIncrement(function (result, input) {
    result[input] = true;
    return result;
}), {}) // => { 2: true, 3: true }

我们来尝试一些更复杂的数据。假设我们有几条 微博

var posts = [
    { author: 'Agatha',  text: 'just setting up my pstr' },
    { author: 'Bert',    text: 'Ed Balls' },
    { author: 'Agatha',  text: '@Bert fancy a thumb war?' },
    { author: 'Charles', text: '#subtweet' },
    { author: 'Bert',    text: 'Ed Balls' },
    { author: 'Agatha',  text: '@Bert m(' }
];

我们来找出谁@了谁并构建一个类似图的数据结构。

function graph(result, input) {
    result[input.from] = result[input.from] || [];
    result[input.from].push(input.to);
    return result;
}

var extractMentions = compose(
    // Find mentions
    filtering(function (post) {
        return post.text.match(/^@/);
    }),
    // Build object with {from, to} keys
    mapping(function (post) {
        return {
            from: post.author,
            to: post.text.split(' ').slice(0,1).join('').replace(/^@/, '')
        };
    })
);

posts.reduce(extractMentions(graph), {}) /* =>
    { Agatha:  ['Bert', 'Charles'],
      Bert:    ['Agatha'],
      Charles: ['Bert'] } */

把transducer应用到通道上

现在我们已经能够实现“可组合的算法变换方式”,可以把它们应用在任何数据管道上——我们来尝试一下通道。为了方便理解,我不会给你展示 通道的具体实现 ,只是一些使用示例。

我们要监听DOM事件,并把它们放入一个通道中,这个通道会筛选出那些发生在偶数X和Y位置的事件并把它们映射为三元组 [type, x, y]

首先需要两个辅助的函数:

// 把DOM事件放入给定的通道
function listen(elem, type, c) {
    elem.addEventListener(type, function (e) {
        chan.put(c, e);
    });
}

function even(x) {
    return (x % 2 === 0);
}

接着,我们创建一个通道,并把它传入transducer。transducer会reduce通道中的数据。

var c = chan(
    1, // Fixed buffer size (only one event allowed)
    compose(
        // Only events with even x & y
        filtering(function (e) {
            return (
                even(e.pageX) &&
                even(e.pageY)
            );
        }),
        // e -> [type, x, y]
        mapping(function (e) {
            return [e.type, e.pageX, e.pageY];
        })
    )
);

下一步,我们把事件和通道关联起来:

listen(document, 'mousemove', c);

最后, 通过递归调用来 take 通道中的数据:

(function recur() {
    chan.take(c, function (v) {
        console.log('got', v);
        recur()
    });
}());

运行上面的代码,你会看到console中出现了许多事件信息——但是只有那些x和y位置为偶数的信息:

> got ["mousemove", 230, 156]
> got ["mousemove", 232, 158]
> got ["mousemove", 232, 160]
> got ["mousemove", 234, 162]

带状态的transducer

最后,让我们来看看有状态的transducer。 gateFilter 通过 mousedownmouseup 事件来检测“拖动”, keyFilter 会根据属性来过滤通道中的数据。

function gateFilter(opener, closer) {
    var open = false;
    return function (e) {
        if (e.type === opener) {
            open = true;
        }
        if (e.type === closer) {
            open = false;
        }
        return open;
    };
}

function keyFilter(key, value) {
    return function (e) {
        return (e[key] === value);
    };
}

var c = chan(
    1,
    compose(
        // Only allow through when mouse has been down
        filtering(gateFilter('mousedown', 'mouseup')),
        // Filter by e.type === 'mousemove'
        filtering(keyFilter('type', 'mousemove')),
        // e -> [type, x, y]
        mapping(function (e) {
            return [e.pageX, e.pageY];
        })
    )
);

// Listen for relevant events
listen(document, 'mousemove', c);
listen(document, 'mouseup',   c);
listen(document, 'mousedown', c);

// Take in a loop
(function recur() {
    chan.take(c, function (v) {
        console.log('got', v);
        recur()
    });
}());

哇哦。很酷,不是吗?

最后...

我觉得transducer有很强的表现力,尤其是用来处理大型应用程序中的数据流。

我真正的目标是要分析 Actor模型 ,因为它涉及到前端工程,特别是可以防止规模增加时候的复杂度爆炸。 Flight框架 使用了Actor模型,但我不认为事件是协调行为和流控制的最佳方法——虽然用在一次性通知中非常完美。

这项工作的成果 在Github上 ,供你参考。如果你有什么问题,可以给我发 电子邮件 或者给我发 Tweet

最后的最后,非常感谢 斯图尔特帕西 对这篇文章的支持!

w3ctech微信

扫码关注w3ctech微信公众号

共收到2条回复

  • mark之, 后看.

    回复此楼
  • 好高深,跟go中的channel差不多。。

    回复此楼