Client
介绍
Dapr客户端允许您与Dapr Sidecar进行通信,并访问其面向客户端的功能,例如发布事件,调用输出绑定,状态管理,机密管理等等。
先决条件
安装和导入 Dapr 的 JS SDK
- 使用 npm 安装 SDK:
npm i @dapr/dapr --save
- 导入类库:
import { DaprClient, DaprServer, HttpMethod, CommunicationProtocolEnum } from "@dapr/dapr";
const daprHost = "127.0.0.1"; // Dapr Sidecar Host
const daprPort = "3500"; // Dapr Sidecar Port of this Example Server
const serverHost = "127.0.0.1"; // App Host of this Example Server
const serverPort = "50051"; // App Port of this Example Server
// HTTP Example
const client = new DaprClient({ daprHost, daprPort });
// GRPC Example
const client = new DaprClient({ daprHost, daprPort, communicationProtocol: CommunicationProtocolEnum.GRPC });
运行
要运行这些示例,您可以使用两种不同的协议与 Dapr Sidecar 进行交互:HTTP(默认)或 gRPC。
使用 HTTP(默认)
import { DaprClient } from "@dapr/dapr";
const client = new DaprClient({ daprHost, daprPort });
# Using dapr run
dapr run --app-id example-sdk --app-protocol http -- npm run start
# or, using npm script
npm run start:dapr-http
使用 gRPC
由于 HTTP 是默认设置,因此必须调整通信协议才能使用 gRPC。 您可以通过向客户端或服务器构造函数传递一个额外的参数来做到这一点。
import { DaprClient, CommunicationProtocol } from "@dapr/dapr";
const client = new DaprClient({ daprHost, daprPort, communicationProtocol: CommunicationProtocol.GRPC });
# Using dapr run
dapr run --app-id example-sdk --app-protocol grpc -- npm run start
# or, using npm script
npm run start:dapr-grpc
环境变量
Dapr Sidecar 终端点
您可以使用 DAPR_HTTP_ENDPOINT
和 DAPR_GRPC_ENDPOINT
环境变量分别设置 Dapr Sidecar 的 HTTP 和 gRPC 终端。 当这些变量被设置时,daprHost
和daprPort
不需要在构造函数的选项参数中设置,客户端会自动从提供的端点中解析出它们。
import { DaprClient, CommunicationProtocol } from "@dapr/dapr";
// Using HTTP, when DAPR_HTTP_ENDPOINT is set
const client = new DaprClient();
// Using gRPC, when DAPR_GRPC_ENDPOINT is set
const client = new DaprClient({ communicationProtocol: CommunicationProtocol.GRPC });
如果环境变量已设置,但是daprHost
和daprPort
的值被传递给构造函数,后者将优先于环境变量。
Dapr API 令牌
您可以使用 DAPR_API_TOKEN
环境变量来设置 Dapr API 令牌。 当这个变量被设置时,daprApiToken
不需要在构造函数的选项参数中设置,客户端会自动获取它。
通用
增加 Body 大小
您可以通过使用DaprClient
的选项来增加应用程序与侧车通信时使用的主体大小。
import { DaprClient, CommunicationProtocol } from "@dapr/dapr";
// Allow a body size of 10Mb to be used
// The default is 4Mb
const client = new DaprClient({
daprHost,
daprPort,
communicationProtocol: CommunicationProtocol.HTTP,
maxBodySizeMb: 10,
});
代理请求
通过代理请求,我们可以利用 Dapr 的 sidecar 架构带来的独特功能,如服务发现、日志记录等,使我们能够立即”升级”我们的 gRPC 服务。 gRPC代理的这个特性在社区讨论41中进行了演示。
创建代理
要执行gRPC代理,只需调用client.proxy.create()
方法创建一个代理:
// As always, create a client to our dapr sidecar
// this client takes care of making sure the sidecar is started, that we can communicate, ...
const clientSidecar = new DaprClient({ daprHost, daprPort, communicationProtocol: CommunicationProtocol.GRPC });
// Create a Proxy that allows us to use our gRPC code
const clientProxy = await clientSidecar.proxy.create<GreeterClient>(GreeterClient);
我们现在可以调用在我们的GreeterClient
接口中定义的方法(在这个例子中是来自Hello World example)
幕后原理(技术工作)
- gRPC 服务在 Dapr 中启动。 我们通过
--app-port
告诉 Dapr 这个 gRPC 服务器运行在哪个端口,并使用--app-id <APP_ID_HERE>
给它一个唯一的 Dapr 应用程序 ID - 现在我们可以通过一个客户端来调用 Dapr Sidecar,该客户端将连接到 Sidecar
- 在调用 Dapr Sidecar 时,我们提供一个名为
dapr-app-id
的元数据键,其值为我们在 Dapr 中启动的 gRPC 服务器的名称(例如,在我们的示例中为server
) - Dapr 现在会将调用转发到配置的 gRPC 服务器
构建块
JavaScript 客户端 SDK 允许您与所有Dapr 构建块 专注于客户端到 Sidecar 功能。
调用 API
调用服务
import { DaprClient, HttpMethod } from "@dapr/dapr";
const daprHost = "127.0.0.1";
const daprPort = "3500";
async function start() {
const client = new DaprClient({ daprHost, daprPort });
const serviceAppId = "my-app-id";
const serviceMethod = "say-hello";
// POST Request
const response = await client.invoker.invoke(serviceAppId, serviceMethod, HttpMethod.POST, { hello: "world" });
// POST Request with headers
const response = await client.invoker.invoke(
serviceAppId,
serviceMethod,
HttpMethod.POST,
{ hello: "world" },
{ headers: { "X-User-ID": "123" } },
);
// GET Request
const response = await client.invoker.invoke(serviceAppId, serviceMethod, HttpMethod.GET);
}
start().catch((e) => {
console.error(e);
process.exit(1);
});
有关服务调用的完整指南,请访问操作方法: 调用服务。
状态管理 API
保存、获取和删除应用程序状态
import { DaprClient } from "@dapr/dapr";
const daprHost = "127.0.0.1";
const daprPort = "3500";
async function start() {
const client = new DaprClient({ daprHost, daprPort });
const serviceStoreName = "my-state-store-name";
// Save State
const response = await client.state.save(
serviceStoreName,
[
{
key: "first-key-name",
value: "hello",
metadata: {
foo: "bar",
},
},
{
key: "second-key-name",
value: "world",
},
],
{
metadata: {
ttlInSeconds: "3", // this should override the ttl in the state item
},
},
);
// Get State
const response = await client.state.get(serviceStoreName, "first-key-name");
// Get Bulk State
const response = await client.state.getBulk(serviceStoreName, ["first-key-name", "second-key-name"]);
// State Transactions
await client.state.transaction(serviceStoreName, [
{
operation: "upsert",
request: {
key: "first-key-name",
value: "new-data",
},
},
{
operation: "delete",
request: {
key: "second-key-name",
},
},
]);
// Delete State
const response = await client.state.delete(serviceStoreName, "first-key-name");
}
start().catch((e) => {
console.error(e);
process.exit(1);
});
有关状态操作的完整列表,请访问 操作方法:获取和保存状态.
查询状态:
import { DaprClient } from "@dapr/dapr";
async function start() {
const client = new DaprClient({ daprHost, daprPort });
const res = await client.state.query("state-mongodb", {
filter: {
OR: [
{
EQ: { "person.org": "Dev Ops" },
},
{
AND: [
{
EQ: { "person.org": "Finance" },
},
{
IN: { state: ["CA", "WA"] },
},
],
},
],
},
sort: [
{
key: "state",
order: "DESC",
},
],
page: {
limit: 10,
},
});
console.log(res);
}
start().catch((e) => {
console.error(e);
process.exit(1);
});
Pub/Sub API
发布消息
import { DaprClient } from "@dapr/dapr";
const daprHost = "127.0.0.1";
const daprPort = "3500";
async function start() {
const client = new DaprClient({ daprHost, daprPort });
const pubSubName = "my-pubsub-name";
const topic = "topic-a";
// Publish message to topic as text/plain
// Note, the content type is inferred from the message type unless specified explicitly
const response = await client.pubsub.publish(pubSubName, topic, "hello, world!");
// If publish fails, response contains the error
console.log(response);
// Publish message to topic as application/json
await client.pubsub.publish(pubSubName, topic, { hello: "world" });
// Publish a JSON message as plain text
const options = { contentType: "text/plain" };
await client.pubsub.publish(pubSubName, topic, { hello: "world" }, options);
// Publish message to topic as application/cloudevents+json
// You can also use the cloudevent SDK to create cloud events https://github.com/cloudevents/sdk-javascript
const cloudEvent = {
specversion: "1.0",
source: "/some/source",
type: "example",
id: "1234",
};
await client.pubsub.publish(pubSubName, topic, cloudEvent);
// Publish a cloudevent as raw payload
const options = { metadata: { rawPayload: true } };
await client.pubsub.publish(pubSubName, topic, "hello, world!", options);
// Publish multiple messages to a topic as text/plain
await client.pubsub.publishBulk(pubSubName, topic, ["message 1", "message 2", "message 3"]);
// Publish multiple messages to a topic as application/json
await client.pubsub.publishBulk(pubSubName, topic, [
{ hello: "message 1" },
{ hello: "message 2" },
{ hello: "message 3" },
]);
// Publish multiple messages with explicit bulk publish messages
const bulkPublishMessages = [
{
entryID: "entry-1",
contentType: "application/json",
event: { hello: "foo message 1" },
},
{
entryID: "entry-2",
contentType: "application/cloudevents+json",
event: { ...cloudEvent, data: "foo message 2", datacontenttype: "text/plain" },
},
{
entryID: "entry-3",
contentType: "text/plain",
event: "foo message 3",
},
];
await client.pubsub.publishBulk(pubSubName, topic, bulkPublishMessages);
}
start().catch((e) => {
console.error(e);
process.exit(1);
});
Bindings API
调用输出绑定
输出绑定
import { DaprClient } from "@dapr/dapr";
const daprHost = "127.0.0.1";
const daprPort = "3500";
async function start() {
const client = new DaprClient({ daprHost, daprPort });
const bindingName = "my-binding-name";
const bindingOperation = "create";
const message = { hello: "world" };
const response = await client.binding.send(bindingName, bindingOperation, message);
}
start().catch((e) => {
console.error(e);
process.exit(1);
});
有关输出绑定的完整指南,请访问操作方法:使用绑定。
密钥 API
检索密钥
import { DaprClient } from "@dapr/dapr";
const daprHost = "127.0.0.1";
const daprPort = "3500";
async function start() {
const client = new DaprClient({ daprHost, daprPort });
const secretStoreName = "my-secret-store";
const secretKey = "secret-key";
// Retrieve a single secret from secret store
const response = await client.secret.get(secretStoreName, secretKey);
// Retrieve all secrets from secret store
const response = await client.secret.getBulk(secretStoreName);
}
start().catch((e) => {
console.error(e);
process.exit(1);
});
有关秘密的完整指南,请访问操作方法: 检索秘密。
配置 API
获取配置键
import { DaprClient } from "@dapr/dapr";
const daprHost = "127.0.0.1";
async function start() {
const client = new DaprClient({
daprHost,
daprPort: process.env.DAPR_GRPC_PORT,
communicationProtocol: CommunicationProtocolEnum.GRPC,
});
const config = await client.configuration.get("config-store", ["key1", "key2"]);
console.log(config);
}
start().catch((e) => {
console.error(e);
process.exit(1);
});
示例输出
{
items: {
key1: { key: 'key1', value: 'foo', version: '', metadata: {} },
key2: { key: 'key2', value: 'bar2', version: '', metadata: {} }
}
}
订阅配置更新
import { DaprClient } from "@dapr/dapr";
const daprHost = "127.0.0.1";
async function start() {
const client = new DaprClient({
daprHost,
daprPort: process.env.DAPR_GRPC_PORT,
communicationProtocol: CommunicationProtocolEnum.GRPC,
});
// Subscribes to config store changes for keys "key1" and "key2"
const stream = await client.configuration.subscribeWithKeys("config-store", ["key1", "key2"], async (data) => {
console.log("Subscribe received updates from config store: ", data);
});
// Wait for 60 seconds and unsubscribe.
await new Promise((resolve) => setTimeout(resolve, 60000));
stream.stop();
}
start().catch((e) => {
console.error(e);
process.exit(1);
});
示例输出
Subscribe received updates from config store: {
items: { key2: { key: 'key2', value: 'bar', version: '', metadata: {} } }
}
Subscribe received updates from config store: {
items: { key1: { key: 'key1', value: 'foobar', version: '', metadata: {} } }
}
加密 API
对加密 API 的支持仅在 JavaScript SDK 中的 gRPC 客户端上可用。
import { createReadStream, createWriteStream } from "node:fs";
import { readFile, writeFile } from "node:fs/promises";
import { pipeline } from "node:stream/promises";
import { DaprClient, CommunicationProtocolEnum } from "@dapr/dapr";
const daprHost = "127.0.0.1";
const daprPort = "50050"; // Dapr Sidecar Port of this example server
async function start() {
const client = new DaprClient({
daprHost,
daprPort,
communicationProtocol: CommunicationProtocolEnum.GRPC,
});
// Encrypt and decrypt a message using streams
await encryptDecryptStream(client);
// Encrypt and decrypt a message from a buffer
await encryptDecryptBuffer(client);
}
async function encryptDecryptStream(client: DaprClient) {
// First, encrypt the message
console.log("== Encrypting message using streams");
console.log("Encrypting plaintext.txt to ciphertext.out");
await pipeline(
createReadStream("plaintext.txt"),
await client.crypto.encrypt({
componentName: "crypto-local",
keyName: "symmetric256",
keyWrapAlgorithm: "A256KW",
}),
createWriteStream("ciphertext.out"),
);
// Decrypt the message
console.log("== Decrypting message using streams");
console.log("Encrypting ciphertext.out to plaintext.out");
await pipeline(
createReadStream("ciphertext.out"),
await client.crypto.decrypt({
componentName: "crypto-local",
}),
createWriteStream("plaintext.out"),
);
}
async function encryptDecryptBuffer(client: DaprClient) {
// Read "plaintext.txt" so we have some content
const plaintext = await readFile("plaintext.txt");
// First, encrypt the message
console.log("== Encrypting message using buffers");
const ciphertext = await client.crypto.encrypt(plaintext, {
componentName: "crypto-local",
keyName: "my-rsa-key",
keyWrapAlgorithm: "RSA",
});
await writeFile("test.out", ciphertext);
// Decrypt the message
console.log("== Decrypting message using buffers");
const decrypted = await client.crypto.decrypt(ciphertext, {
componentName: "crypto-local",
});
// The contents should be equal
if (plaintext.compare(decrypted) !== 0) {
throw new Error("Decrypted message does not match original message");
}
}
start().catch((e) => {
console.error(e);
process.exit(1);
});
有关密码学的完整指南,请访问操作方法:密码学。
分布式锁 API
尝试锁定和解锁API
import { CommunicationProtocolEnum, DaprClient } from "@dapr/dapr";
import { LockStatus } from "@dapr/dapr/types/lock/UnlockResponse";
const daprHost = "127.0.0.1";
const daprPortDefault = "3500";
async function start() {
const client = new DaprClient({ daprHost, daprPort });
const storeName = "redislock";
const resourceId = "resourceId";
const lockOwner = "owner1";
let expiryInSeconds = 1000;
console.log(`Acquiring lock on ${storeName}, ${resourceId} as owner: ${lockOwner}`);
const lockResponse = await client.lock.lock(storeName, resourceId, lockOwner, expiryInSeconds);
console.log(lockResponse);
console.log(`Unlocking on ${storeName}, ${resourceId} as owner: ${lockOwner}`);
const unlockResponse = await client.lock.unlock(storeName, resourceId, lockOwner);
console.log("Unlock API response: " + getResponseStatus(unlockResponse.status));
}
function getResponseStatus(status: LockStatus) {
switch (status) {
case LockStatus.Success:
return "Success";
case LockStatus.LockDoesNotExist:
return "LockDoesNotExist";
case LockStatus.LockBelongsToOthers:
return "LockBelongsToOthers";
default:
return "InternalError";
}
}
start().catch((e) => {
console.error(e);
process.exit(1);
});
了解有关使用分布式锁的完整指南:操作方法:使用分布式锁.
工作流 API
工作流管理
import { DaprClient } from "@dapr/dapr";
async function start() {
const client = new DaprClient();
// Start a new workflow instance
const instanceId = await client.workflow.start("OrderProcessingWorkflow", {
Name: "Paperclips",
TotalCost: 99.95,
Quantity: 4,
});
console.log(`Started workflow instance ${instanceId}`);
// Get a workflow instance
const workflow = await client.workflow.get(instanceId);
console.log(
`Workflow ${workflow.workflowName}, created at ${workflow.createdAt.toUTCString()}, has status ${
workflow.runtimeStatus
}`,
);
console.log(`Additional properties: ${JSON.stringify(workflow.properties)}`);
// Pause a workflow instance
await client.workflow.pause(instanceId);
console.log(`Paused workflow instance ${instanceId}`);
// Resume a workflow instance
await client.workflow.resume(instanceId);
console.log(`Resumed workflow instance ${instanceId}`);
// Terminate a workflow instance
await client.workflow.terminate(instanceId);
console.log(`Terminated workflow instance ${instanceId}`);
// Purge a workflow instance
await client.workflow.purge(instanceId);
console.log(`Purged workflow instance ${instanceId}`);
}
start().catch((e) => {
console.error(e);
process.exit(1);
});