Deno Queues 发布公告
在不断发展的云软件世界中,Deno 旨在彻底简化开发。利用公共云基础设施传统上需要筛选大量的样板代码和复杂的配置,这通常会占用开发人员大量的时间和精力。我们的目标是将这些复杂性提炼为用户友好的原语,使开发人员能够以无与伦比的速度设计、完善和发布他们的项目。
考虑到这一点,我们几个月前推出了 Deno KV(目前处于公开测试阶段)。Deno KV 基于 FoundationDB 强大的功能构建,它不仅仅是应用程序的一种新的持久化选项。它旨在通过消除冗余配置和提供耳目一新的简化 API 来改变开发人员的体验。
在此基础之上(双关语),我们今天很高兴推出 Deno Queues。这个工具将彻底改变可伸缩消息传递,并提升应用程序中后台处理的管理。
const db = await Deno.openKv();
db.listenQueue(async (msg) => {
await postToSlack(msg.channel, msg.text);
});
await db.enqueue({ channel: "C123456", text: "Slack message" }, {
delay: 60000,
});
在这篇文章中,我们将介绍 Deno Queues 的主要方面
什么是 Deno Queues?
Deno Queues 基于 Deno KV 构建,允许您卸载应用程序的部分功能或安排未来异步运行的工作,提供了两个新的简单 API,无需任何配置或基础设施维护。
.enqueue()
:将新消息推送到队列中,以确保立即或在未来某个时间送达。.listenQueue()
:用于处理来自队列的新消息的处理程序。
由于 Queues 是基于 Deno KV 构建的,它在本地运行时使用 SQLite,在 Deno Deploy 上运行时使用 FoundationDB,以实现最大的可用性和吞吐量。
在 Deno Deploy 上运行 Queues 已针对性能进行优化。Deno Deploy 会按需自动启动 V8 隔离环境,并在消息可用时进行分派处理。您的应用程序代码只需使用 listenQueue
处理程序监听新消息,Deno Deploy 会处理其余部分。
Deno Queues 保证至少一次交付。 对于大多数入队消息,listenQueue
处理程序将只被调用一次。在某些故障情况下,处理程序可能会被多次调用以确保交付。重要的是要设计您的应用程序以确保正确处理重复消息。
您还可以将 Queues 与 KV 原子事务原语结合使用,这将解锁强大的工作流。例如,您可以将消息作为 KV 事务的一部分添加到队列中,该事务将原子性地成功或失败
const kv = await Deno.openKv();
const change = 10;
const bob = await kv.get(["balance", "bob"]);
const liz = await kv.get(["balance", "liz"]);
if (bob.value < change) {
throw "not enough balance";
}
const success = await kv.atomic()
.check(bob, liz) // balances did not change
.set(["balance", "bob"], bob.value - change)
.set(["balance", "liz"], liz.value + change)
// Enqueue a message to notify Liz and Bob
.enqueue({ type: "notify", name: "liz", amount: change })
.enqueue({ type: "notify", name: "bob", amount: -change })
.commit();
您还可以从 listenQueue
处理程序更新 Deno KV 状态。例如,如果您想确保每个消息的更新只执行一次,您也可以将 Queue API 与 KV 原子事务结合使用
const db = await Deno.openKv();
db.listenQueue(async (msg) => {
const nonce = await db.get(["nonces", msg.nonce]);
if (nonce.value === null) {
// This messaged was already processed.
return;
}
const change = msg.change;
const bob = await db.get(["balance", "bob"]);
const liz = await db.get(["balance", "liz"]);
const success = await db.atomic()
// Ensure this message was not yet processed
.check({ key: nonce.key, versionstamp: nonce.versionstamp })
.delete(nonce.key)
.sum(["processed_count"], 1n)
.check(bob, liz) // balances did not change
.set(["balance", "bob"], bob.value - change)
.set(["balance", "liz"], liz.value + change)
.commit();
});
const nonce = crypto.randomUUID();
await db
.atomic()
.check({ key: ["nonces", nonce], versionstamp: null })
.enqueue({ nonce, change: 10 })
.set(["nonces", nonce], true)
.sum(["enqueued_count"], 1n)
.commit();
此外,如果您的 listenQueue
处理程序抛出异常,运行时将自动重试调用该处理程序,直到成功或达到最大重试次数。如果达到最大尝试次数(当前默认为 5 次),消息将被丢弃。
用例和示例
队列通过允许服务器卸载异步进程和安排未来的工作,在扩展应用程序方面非常有用。
以下是一些示例。
定时邮件通知
有时,用户发起的一项任务可能需要较长时间,您不希望让用户等待“任务完成”响应,或者根本不需要发送响应。这时您可以将工作卸载到队列中,以保持服务器或应用程序对用户保持响应。
以下是如何使用队列发送电子邮件通知的方法
const db = await Deno.openKv();
db.listenQueue(async (msg) => {
if (msg.type === "welcome_email") {
await sendWelcomeEmail(msg.customer_id);
} else if (msg.type === "survey_email") {
await sendSurveyEmail(msg.customer_id);
}
});
await db.enqueue(
{ type: "welcome_email", customer_id: 123 },
);
await db.enqueue(
{ type: "survey_email", customer_id: 123 },
{ delay: 259200000 }, // deliver in 3 days
);
可靠的 Webhook 处理
在 Web 上使用队列的另一个非常常见的例子是处理 Webhook。以下是一个使用 Oak 和 Queues 异步处理 Webhook 的示例
import { Application, Router } from "https://deno.land/x/oak@v12.6.1/mod.ts";
const db = await Deno.openKv();
db.listenQueue(async (msg) => {
await processWebHook(msg.webhook_body);
});
const router = new Router();
router.post("/webhook", async (ctx) => {
db.enqueue({ webhook_body: await ctx.request.body().value });
ctx.response.status = 200;
});
const app = new Application();
app.use(router.routes());
app.use(router.allowedMethods());
await app.listen({ port: 8000 });
Slack 提醒机器人
队列非常适合在 Discord 或 Slack 中构建机器人。
这里有一个示例,展示了如何使用 Deno Queues 在 Slack 中创建一个简单的提醒应用程序。
这是 一个 Discord 机器人,它使用 Deno Queues 创建赠品并允许用户一键加入。
更多示例
更多队列使用示例可在 docs.deno.com 找到。
Deno Queues 定价
当您探索 Queues 的功能时,了解其成本影响至关重要。Queues 本身没有特定的成本,而是根据 Deno KV 操作和 Deno Deploy 请求(用于监听)进行计费。具体来说,
消息入队:每个入队操作都会转换为一次 KV 写入操作。
接收消息:每条接收到的消息都涉及一次 KV 写入和一次请求费用。
这种透明的定价结构确保您只为使用的操作付费,符合我们对效率和简单性的承诺。
其他资源
下一步是什么
在 Web 上构建可伸缩应用程序和服务器需要将后台任务卸载到队列中。然而,配置它们需要许多步骤。Deno Queues 内置于运行时中,并建立在 Deno Deploy 的强大基础设施之上,让您只需几行代码即可使用无服务器的分布式队列。
Deno Queues 与 Deno KV、Web 标准 API、npm 以及 一体化的现代工具一起,成为使 Web 开发更简单、更高效的关键构建块。我们距离目标还有很长的路要走,路线图上还有许多令人兴奋的功能。敬请关注。
我们始终欢迎反馈和功能请求!请随意 加入我们不断壮大的 Discord 或 在此创建问题。