深圳幻海软件技术有限公司 欢迎您!

Dapr 入门教程之发布订阅

2023-03-20

前面我们了解了如果在Dapr下面进行服务调用,以及最简单的状态管理,本节我们来了解如何启用Dapr的发布/订阅模式,发布者将生成特定主题的消息,而订阅者将监听特定主题的信息。使用发布服务,开发人员可以重复发布消息到一个主题上。Pub/sub组件对这些消息进行排队处理。该主题订阅者将从队列中获取到消息

前面我们了解了如果在 Dapr 下面进行服务调用,以及最简单的状态管理,本节我们来了解如何启用 Dapr 的发布/订阅模式,发布者将生成特定主题的消息,而订阅者将监听特定主题的信息。

  • 使用发布服务,开发人员可以重复发布消息到一个主题上。
  • Pub/sub 组件对这些消息进行排队处理。
  • 该主题订阅者将从队列中获取到消息并处理他们。

接下来我们使用的这个示例包含一个发布者:

  • React 前端消息生成器

包含另外 3 个消息订阅者:

  • Node.js 订阅者
  • Python 订阅者
  • C# 订阅者

Dapr 使用可插拔的消息总线来支持发布-订阅,并将消息传递给 CloudEvents(一个 CNCF 项目) 作为通用的事件信封格式,以提高连接服务的互操作性。

我们这里将使用 Redis Streams(在 Redis version = > 5 中启用),当然也可以使用 RabbitMQ、Kafka 等中间件。下图是用来说明组件之间是如何在本地模式下互相连接的。

dapr pub/sub

本地初始化

Dapr 允许你将相同的微服务从本地机器部署到云环境中去,这里为了和大家说明这种便利性,我们先在本地部署这个实例项目,然后再将其部署到 Kubernetes 环境中去。

要在本地使用 Dapr 服务,需要先在本地初始化 Dapr:

$ dapr init
  • 1.

由于某些网络原因使用上面的命令可能并不能初始化成功,我们可以使用离线的方式进行安装,前往 https://github.com/dapr/installer-bundle/releases 下载对应系统的 Bundle 👝 包,然后解压,比如我这里是 Mac M1,使用下面的命令下载:

$ wget https://github.91chi.fun/https://github.com/dapr/installer-bundle/releases/download/v1.8.4/daprbundle_darwin_arm64.tar.gz
$ tar -xvf daprbundle_darwin_arm64.tar.gz
x daprbundle/
x daprbundle/README.md
x daprbundle/dapr
x daprbundle/details.json
x daprbundle/dist/
x daprbundle/dist/daprd_darwin_arm64.tar.gz
x daprbundle/dist/dashboard_darwin_arm64.tar.gz
x daprbundle/dist/placement_darwin_arm64.tar.gz
x daprbundle/docker/
x daprbundle/docker/daprio-dapr-1.8.4.tar.gz
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.

然后我们可以重新使用下面的命令进行初始化:

$ dapr init --from-dir daprbundle/
  Making the jump to hyperspace...
  Local bundle installation using --from-dir flag is currently a preview feature and is subject to change. It is only available from CLI version 1.7 onwards.
ℹ️  Installing runtime version 1.8.4
  Extracting binaries and setting up components...
Dapr runtime installed to /Users/cnych/.dapr/bin, you may run the following to add it to your path if you want to run daprd directly:
    export PATH=$PATH:/Users/cnych/.dapr/bin
8d7366c22fd8: Loading layer [==================================================>]  3.697MB/3.697MB
61f7f94319f6: Loading layer [==================================================>]  238.6MB/238.6MB
  Extracting binaries and setting up components... Loaded image: daprio/dapr:1.8.4
  Extracting binaries and setting up components...
  Extracted binaries and completed components set up.
ℹ️  daprd binary has been installed to /Users/cnych/.dapr/bin.
ℹ️  dapr_placement container is running.
ℹ️  Use `docker ps` to check running containers.
$ dapr version
CLI version: 1.8.0
Runtime version: 1.8.4
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.

默认会启用 ​​zipkin​​​ 这个 tracing 服务,使用上面的命令初始化如果没有对应的容器,则可以使用 ​​docker run --name dapr_zipkin -d -p 9411:9411 dockerproxy.com/openzipkin/zipkin​​​ 启动该服务。同样也需要运行一个 Redis 服务:​​docker run --name dapr_redis -d -p 6379:6379 dockerproxy.com/redislabs/rejson​​。

dapr containers

消息订阅服务

这里我们还是使用前面使用的 quickstarts 这个项目,克隆项目到本地:

git clone [-b <dapr_version_tag>] https://github.com/dapr/quickstarts.git
  • 1.

进入 tutorials/pub_sub 目录下面:

➜  pub-sub git:(622b7d9) ls
README.md         deploy            makefile          message_b.json    node-subscriber   react-form
csharp-subscriber img               message_a.json    message_c.json    python-subscriber
  • 1.
  • 2.
  • 3.

运行 Node 消息订阅服务

首先我们使用 Dapr 运行 node 消息订阅服务,导航到 node-subscriber 目录,安装依赖:

$ cd node-subscriber
$ npm install  # 或者 yarn
  • 1.
  • 2.

执行如下所示命令运行 node 消息订阅服务:

$ dapr run --app-id node-subscriber --app-port 3000 node app.js
ℹ️  Starting Dapr with id node-subscriber. HTTP Port: 50728. gRPC Port: 50729
INFO[0000] starting Dapr Runtime -- version 1.8.4 -- commit 18575823c74318c811d6cd6f57ffac76d5debe93  app_id=node-subscriber instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
# ......
INFO[0000] app is subscribed to the following topics: [A B] through pubsub=pubsub  app_id=node-subscriber instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
INFO[0000] dapr initialized. Status: Running. Init Elapsed 312.69599999999997ms  app_id=node-subscriber instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
INFO[0000] placement tables updated, version: 0          app_id=node-subscriber instance=MBP2022.local scope=dapr.runtime.actor.internal.placement type=log ver=1.8.4
ℹ️  Updating metadata for app command: node app.js
✅  You're up and running! Both Dapr and your app logs will appear here.
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.

上面命令中的 app-id 是微服务的唯一标识符,--app-port 是 Node 应用程序运行的端口,最后,运行应用程序的命令是 node app.js。

运行 Python 消息订阅服务

接下来使用 Dapr 运行 Python 消息订阅服务,导航到 python-subscriber 目录:

$ cd python-subscriber
  • 1.

安装应用依赖:

$ pip3 install -r requirements.txt
  • 1.

同样再次使用 dapr run 来运行该订阅服务:

$ dapr run --app-id python-subscriber --app-port 5001 python3 app.py
ℹ️  Starting Dapr with id python-subscriber. HTTP Port: 55508. gRPC Port: 55509
INFO[0000] starting Dapr Runtime -- version 1.8.4 -- commit 18575823c74318c811d6cd6f57ffac76d5debe93  app_id=python-subscriber instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
INFO[0000] log level set to: info                        app_id=python-subscriber instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
INFO[0000] enabled gRPC metrics middleware               app_id=python-subscriber instance=MBP2022.local scope=dapr.runtime.grpc.internal type=log ver=1.8.4
INFO[0000] internal gRPC server is running on port 55514  app_id=python-subscriber instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
INFO[0000] application protocol: http. waiting on port 5001.  This will block until the app is listening on that port.  app_id=python-subscriber instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
INFO[0000] application discovered on port 5001           app_id=python-subscriber instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
WARN[0000] [DEPRECATION NOTICE] Adding a default content type to incoming service invocation requests is deprecated and will be removed in the future. See https://docs.dapr.io/operations/support/support-preview-features/ for more details. You can opt into the new behavior today by setting the configuration option `ServiceInvocation.NoDefaultContentType` to true.  app_id=python-subscriber instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
== APP ==  * Serving Flask app "app" (lazy loading)
== APP ==  * Environment: production
== APP ==    WARNING: This is a development server. Do not use it in a production deployment.
== APP ==    Use a production WSGI server instead.
== APP ==  * Debug mode: off
== APP ==  * Running on http://127.0.0.1:5001/ (Press CTRL+C to quit)
ℹ️  Updating metadata for app command: python3 app.py
  You're up and running! Both Dapr and your app logs will appear here.
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.

由于我们这里没有 C# 环境,所以只运行 Node 和 Python 这两个消息订阅服务了。

消息发布服务

接下来我们来运行 React 这个前端消息发布服务,同样先导航到 react-form 项目目录下面:

$ cd react-form
  • 1.

然后执行下面的命令安装依赖并构建服务:

$ npm run buildclient
$ npm install
  • 1.
  • 2.

构建完成后可以使用下面的 dapr 命令来启动该前端服务:

$ dapr run --app-id react-form --app-port 8080 npm run start
ℹ️  Starting Dapr with id react-form. HTTP Port: 57303. gRPC Port: 57304
INFO[0000] starting Dapr Runtime -- version 1.8.4 -- commit 18575823c74318c811d6cd6f57ffac76d5debe93  app_id=react-form instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
# ......
== APP ==
== APP == > react-form@1.0.0 start
== APP == > node server.js
== APP ==
== APP == Listening on port 8080!
INFO[0000] application discovered on port 8080           app_id=react-form instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
# ......
INFO[0000] dapr initialized. Status: Running. Init Elapsed 760.39ms  app_id=react-form instance=MBP2022.local scope=dapr.runtime type=log ver=1.8.4
ℹ️  Updating metadata for app command: npm run start
  You're up and running! Both Dapr and your app logs will appear here.

INFO[0001] placement tables updated, version: 0          app_id=react-form instance=MBP2022.local scope=dapr.runtime.actor.internal.placement type=log ver=1.8.4
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.

当看到 == APP == Listening on port 8080! 这样的日志时,表示应用启动成功了。然后我们就可以在浏览器中访问 http://localhost:8080 访问前端应用了。

前端页面

比如现在我们选择消息类型 A,然后随便输入一些消息内容,点击 Submit 发送,然后观察上面的 Node 和 Python 这两个消息订阅者服务的日志。

选择一个主题,输入一些文字,然后发送一条信息!观察通过你们各自的 Dapr 的日志。

Dapr 消息订阅发布服务

注意,Node 订阅者接收类型为 A 和 B 的消息,而 Python 订阅者接收类型为 A和 C 的消息,所以注意每个控制台窗口的日志显示。

此外 Dapr CLI 提供了一个机制来发布消息用于测试,比如我们可以使用如下命令来发布一条消息:

$ dapr publish --publish-app-id react-form --pubsub pubsub --topic A --data-file message_a.json
  • 1.

dapr cli publish

到这里我们就完成了使用 Dapr 来进行消息订阅发布的功能演示。

在 Kubernetes 中运行

上面我们是将演示服务在本地部署的,我们知道使用 Dapr 开发的服务是和平台没关系的,可以很轻松迁移到云环境,比如现在我们再将上面的示例应用部署到 Kubernetes 集群中。

要在 Kubernetes 中运行相同的代码,首先需要设置 Redis 存储,然后部署微服务,将使用相同的微服务,但最终架构有所不同:

运行在K8s

前面我们已经使用 Helm 安装了 bitnami 下面的 redis 应用:

$ helm repo add bitnami https://charts.bitnami.com/bitnami
$ helm repo update
$ helm install redis bitnami/redis
  • 1.
  • 2.
  • 3.

有了 Redis 服务过后,接着我们需要创建一个发布订阅的 Component 组件,前文是创建的一个使用 Redis 的状态管理组件,对应的组件资源清单如下所示:

# deploy/redis.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
    # These settings will work out of the box if you use `helm install
    # bitnami/redis`.  If you have your own setup, replace
    # `redis-master:6379` with your own Redis master address, and the
    # Redis password with your own Secret's name. For more information,
    # see https://docs.dapr.io/operations/components/component-secrets .
    - name: redisHost
      value: redis-master:6379
    - name: redisPassword
      secretKeyRef:
        name: redis
        key: redis-password
auth:
  secretStore: kubernetes
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.

直接应用上面的资源清单即可:

$ kubectl apply -f deploy/redis.yaml
component.dapr.io/pubsub created
$ kubectl get components
NAME         AGE
pubsub       26s
statestore   45h
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.

现在我们就有了一个使用 Redis 为中间件的发布订阅组件了,注意上面对象的类型为 ​​pubsub.redis​​。

redis pubsub

接着我们就可以部署 Python、Node 和 React-form 这 3 个微服了:

$ kubectl apply -f deploy/node-subscriber.yaml
$ kubectl apply -f deploy/python-subscriber.yaml
$ kubectl apply -f deploy/react-form.yaml
  • 1.
  • 2.
  • 3.

部署后查看 Pod 的状态:

$ kubectl get pods
NAME                                 READY   STATUS    RESTARTS         AGE
node-subscriber-5b5777c785-z8jzn     2/2     Running   0                30m
python-subscriber-76d9fc6c87-ffj7r   2/2     Running   0                30m
react-form-68db4b7777-7qmtj          2/2     Running   0                30m
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.

react-form 这个微服务会通过一个 LoadBalancer 类型的 Service 来对外暴露服务:

$ kubectl get svc
NAME                     TYPE           CLUSTER-IP       EXTERNAL-IP    PORT(S)                               AGE
node-subscriber-dapr     ClusterIP      None             <none>         80/TCP,50001/TCP,50002/TCP,9090/TCP   31m
python-subscriber-dapr   ClusterIP      None             <none>         80/TCP,50001/TCP,50002/TCP,9090/TCP   31m
react-form               LoadBalancer   10.110.199.146   192.168.0.51   80:32510/TCP                          30m
react-form-dapr          ClusterIP      None             <none>         80/TCP,50001/TCP,50002/TCP,9090/TCP   30m
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.

然后我们就可以通过分配的 EXTERNAL-IP 访问前端服务了。同样在前端页面发送几个不同的消息通知,然后使用 kubectl logs 观察 Node 和 Python 订阅服务的日志。

$ kubectl logs --selector app=node-subscriber -c node-subscriber
$ kubectl logs --selector app=python-subscriber -c python-subscriber
  • 1.
  • 2.

pub-sub on K8s

如何工作

现在,我们已经在本地和 Kubernetes 中运行了订阅发布示例应用,接下来我们来分析下这是如何工作的。该应用程序分为两个订阅者和一个发布者。

Node 消息订阅服务

重新导航到 node-scriber 目录并查看 Node.js 订阅者代码 app.js,该服务通过 Express 暴露了三个 API 端点。第一个是 GET 端点:

app.get("/dapr/subscribe", (_req, res) => {
  res.json([
    {
      pubsubname: "pubsub",
      topic: "A",
      route: "A",
    },
    {
      pubsubname: "pubsub",
      topic: "B",
      route: "B",
    },
  ]);
});
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.

该段代码是告诉 Dapr 要订阅 pubsub 这个组件的哪些主题,其中的 ​​route​​ 表示使用路由到那个端点来处理消息,当部署(本地或 Kubernetes)时,Dapr 将调用服务以确定它是否订阅了任何内容。其他两个端点是后端点:

QQDNA9BY" data-card-editable="false" class="" data-syntax="java">
app.post("/A", (req, res) => {
  console.log("A: ", req.body.data.message);
  res.sendStatus(200);
});

app.post("/B", (req, res) => {
  console.log("B: ", req.body.data.message);
  res.sendStatus(200);
});
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.

这两个端点处理来自每个主题类型的消息,我们这里只是记录消息,当然在更复杂的应用程序中,这里就是需要处理业务逻辑的地方了。

此外我们也可以直接通过创建一个 Subscription 的对象来声明在哪些服务里面来订阅组件中的哪些主题。

Python 消息订阅服务

同样导航到 python-subscriber 目录,查看 Python 订阅服务的代码文件 app.py。与 Node.js 订阅者一样,我们暴露了三个 API 端点,只是这里使用的是 flask,第一个是 GET 端点:

@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
    subscriptions = [{
        'pubsubname': 'pubsub', 'topic': 'A', 'route': 'A'
    }, {
        'pubsubname': 'pubsub', 'topic': 'C', 'route': 'C'
    }]
    return jsonify(subscriptions)
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.

同样的方式,这是告诉 Dapr 要订阅 pubsub 组件的哪些主题,这里我们订阅的组件名为 pubsub 的,主题为 A 和 C,这些主题的消息通过其他两个路由进行处理:

@app.route('/A', methods=['POST'])
def a_subscriber():
    print(f'A: {request.json}', flush=True)
    print('Received message "{}" on topic "{}"'.format(request.json['data']['message'], request.json['topic']), flush=True)
    return json.dumps({'success':True}), 200, {'ContentType':'application/json'}

@app.route('/C', methods=['POST'])
def c_subscriber():
    print(f'C: {request.json}', flush=True)
    print('Received message "{}" on topic "{}"'.format(request.json['data']['message'], request.json['topic']), flush=True)
    return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.

React 前端应用

上面是两个订阅服务,接下来查看下发布者,我们的发布者分为客户端和服务器。

客户端是一个简单的单页 React 应用程序,使用 Create React App 启动,相关的客户端代码位于react-form/client/src/MessageForm.js,当用户提交表单时,将使用最新的聚合 JSON 数据更新 React 状态。默认情况下,数据设置为:

{
    messageType: "A",
    message: ""
};
  • 1.
  • 2.
  • 3.
  • 4.

提交表单后,聚合的 JSON 数据将发送到服务器:

fetch("/publish", {
  headers: {
    Accept: "application/json",
    "Content-Type": "application/json",
  },
  method: "POST",
  body: JSON.stringify(this.state),
});
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.

服务端是一个典型的 express 应用程序,它暴露了一个 POST 端点:/publish。这样可以从客户端接收请求,并根据 Dapr 发布它们。Express 内置的 JSON 中间件函数用于解析传入请求中的 JSON:

app.use(express.json());
  • 1.

这样我们可以获取到提交的 messageType,可以确定使用哪个主题来发布消息。要使用 Dapr 来发布消息,同样也是直接使用 Dapr 提供的 API 端点 http://localhost:<DAPR_URL>/publish/<PUBSUB_NAME>/<TOPIC> 即可,根据获取到的数据构建 Dapr 消息发布的 URL,提交 JSON 数据,POST 请求还需要在成功完成后返回响应中的成功代码。

const publishUrl = `${daprUrl}/publish/${pubsubName}/${req.body?.messageType}`;
await axios.post(publishUrl, req.body);
return res.sendStatus(200);
  • 1.
  • 2.
  • 3.

daprUrl 的地址所在的端口可以用下面的代码来获取:

const daprUrl = `http://localhost:${process.env.DAPR_HTTP_PORT || 3500}/v1.0`;
  • 1.

默认情况下,Dapr 在 3500 上运行,但如果我们在本地运行 Dapr 并将其设置为其他端口(使用 CLI run 命令中的 --app-port 标志),则该端口将作为环境变量注入应用程序。

此外服务端还通过将默认主页 / 路由请求转发到构建的客户端代码来托管 React 应用程序本身:

app.get("/", function (_req, res) {
  res.sendFile(path.join(__dirname, "client/build", "index.html"));
});
  • 1.
  • 2.
  • 3.

所以我们可以直接通过服务端来访问到前端页面。

发布-订阅模式是我们微服务开发中非常重要的一个模式,可以用来实现高可伸缩性和松耦合。发布订阅通常用于需要高度可伸缩的大型应用程序,发布和订阅应用程序通常比传统的 client/server 应用程序具有更好的伸缩性。Pub-sub 允许我们完全解耦组件,发布者不必知道他们的任何订阅者,订阅者也不必知道发布者。这使得开发人员可以编写更精简的微服务,而不会直接依赖彼此。

从上面的示例可以看出 Dapr 中使用发布订阅模式进行开发就完全变成了面向 ​​localhost​​ 编程了。