反应式编程

反应式编程是处理数据流和变化传播的编程范例。 这意味着,当数据流由一个组件发出时,更改将通过反应式编程库传播到其他组件。变化的传播将持续到最终的接收者。 事件驱动和反应式编程的区别在于,事件驱动式编程围绕事件展开,反应式编程围绕着数据展开。

ReactiveX或RX用于反应式编程
ReactiveX或者Raective Extension是反应式编程最着名的实现。 ReactiveX的工作取决于以下两个类 -

可观察的类
这个类是数据流或事件的来源,它打包传入的数据,以便数据可以从一个线程传递到另一个线程。 在某些观察者订阅它之前,它不会提供数据。

观察员类
该类使用observable发出的数据流。 可以有多个可观察的观察者,每个观察者将接收每个发射的数据项。 观察者可以通过订阅可观察到的三种类型的事件 -

  • on_next()事件 - 它意味着数据流中有一个元素。
  • on_completed()事件 - 它意味着排放已经结束,没有更多数据项到来。
  • on_error()事件 - 它也意味着排放的结束,但在可观察到抛出错误的情况下。

RxPY - 用于反应式编程的Python模块

RxPY是一个Python模块,可用于反应式编程。 我们需要确保模块已安装。 以下命令可用于安装RxPY模块 -

pip install RxPY

例子
以下是一个Python脚本,它使用RxPY模块及Observable类和Observe类来进行反应式编程。 基本上有两类 -

  • get_strings() - 用于从观察者获取字符串。
  • PrintObserver() - 用于从观察者打印字符串。 它使用观察员班的所有三个事件。 它也使用subscribe()类。

参考以下实现代码 -

from rx import Observable, Observer
def get_strings(observer):
   observer.on_next("Ram")
   observer.on_next("Mohan")
   observer.on_next("Shyam")
      observer.on_completed()
class PrintObserver(Observer):
   def on_next(self, value):
      print("Received {0}".format(value))
   def on_completed(self):
   print("Finished")
   def on_error(self, error):
      print("Error: {0}".format(error))
source = Observable.create(get_strings)
source.subscribe(PrintObserver())

执行上面示例代码,得到以下结果 -

Received Ram
Received Mohan
Received Shyam
Finished

用于反应式编程的PyFunctional库

PyFunctionalis是另一个可用于响应式编程的Python库。 它使我们能够使用Python编程语言创建功能程序。 这很有用,因为它允许我们通过使用链式函数操作符来创建数据管道。

RxPY和PyFunctional之间的区别

这两个库都用于响应式编程,并以类似的方式处理流,但两者的主要区别取决于数据的处理。 RxPY处理系统中的数据和事件,而PyFunctional专注于使用函数式编程范例转换数据。

安装PyFunctional模块

需要在使用之前安装这个模块。可以通过以下pip命令来安装 -

pip install pyfunctional

例子
以下示例使用PyFunctional模块及其seq类,它们充当可以迭代和操作的流对象。 在这个程序中,它使用将每个值加倍的lamda函数映射序列,然后过滤x大于4的值,最后将序列减少为所有剩余值的和。

from functional import seq

result = seq(1,2,3).map(lambda x: x*2).filter(lambda x: x > 4).reduce(lambda x, y: x + y)

print ("Result: {}".format(result))

执行上面示例代码,得到以下结果 -

Result: 6

上一篇:事件驱动程序

下一篇:没有了

关注微信小程序
程序员编程王-随时随地学编程

扫描二维码
程序员编程王

扫一扫关注最新编程教程