在不断发展的云软件世界中,Deno 旨在从根本上进行简化。传统上,利用公共云基础设施需要筛选大量的样板代码和复杂的配置,这通常会占用开发人员大量的时间和精力。我们的目标是将这些复杂性提炼成用户友好的原语,使开发人员能够以无与伦比的速度设计、改进和启动他们的项目。
考虑到这一点,我们在几个月前推出了 Deno KV(目前处于公开测试阶段)。Deno KV 基于 FoundationDB 的强大功能,不仅仅是应用程序的一种新的持久化选项。它还通过消除冗余配置并提供令人耳目一新的简化 API 来转变开发人员的体验。
在这个基础之上(双关语),我们很高兴今天发布 Deno 队列。该工具旨在彻底改变可扩展消息传递,并提升应用程序中后台处理的管理水平。
const db = await Deno.openKv();
db.listenQueue(async (msg) => {
await postToSlack(msg.channel, msg.text);
});
await db.enqueue({ channel: "C123456", text: "Slack message" }, {
delay: 60000,
});
在这篇文章中,我们将介绍 Deno 队列的关键方面
什么是 Deno 队列?
Deno 队列基于 Deno KV 构建,允许您卸载应用程序的某些部分或安排未来的工作以异步运行,并提供两个新的简单 API,无需配置或维护基础设施
.enqueue()
:将新消息推送到队列中,以保证立即或在未来的某个时间交付。.listenQueue()
:用于处理来自队列的新消息的处理程序。
由于队列基于 Deno KV 构建,因此在本地运行时使用 SQLite,在 Deno Deploy 上运行时使用 FoundationDB,以实现最大的可用性和吞吐量。
在 Deno Deploy 上运行队列针对性能进行了优化。Deno Deploy 会自动按需启动 V8 隔离,并在消息可用时分派消息以进行处理。您的应用程序代码只需使用 listenQueue
处理程序监听新消息,Deno Deploy 会处理其余的事情。
Deno 队列保证至少一次交付。 对于大多数入队的消息,listenQueue
处理程序将被调用一次。在某些故障情况下,处理程序可能会被多次调用以确保交付。重要的是要设计您的应用程序,以确保正确处理重复消息。
您还可以将 队列与 KV 原子事务 原语结合使用,这可以解锁强大的工作流程。例如,您可以将消息添加到队列中,作为 KV 事务的一部分,该事务会原子性地成功或失败
const kv = await Deno.openKv();
const change = 10;
const bob = await kv.get(["balance", "bob"]);
const liz = await kv.get(["balance", "liz"]);
if (bob.value < change) {
throw "not enough balance";
}
const success = await kv.atomic()
.check(bob, liz) // balances did not change
.set(["balance", "bob"], bob.value - change)
.set(["balance", "liz"], liz.value + change)
// Enqueue a message to notify Liz and Bob
.enqueue({ type: "notify", name: "liz", amount: change })
.enqueue({ type: "notify", name: "bob", amount: -change })
.commit();
您还可以从 listenQueue
处理程序更新 Deno KV 状态。例如,如果您想确保每个消息的更新只执行一次,您也可以将队列 API 与 KV 原子事务一起使用
const db = await Deno.openKv();
db.listenQueue(async (msg) => {
const nonce = await db.get(["nonces", msg.nonce]);
if (nonce.value === null) {
// This messaged was already processed.
return;
}
const change = msg.change;
const bob = await db.get(["balance", "bob"]);
const liz = await db.get(["balance", "liz"]);
const success = await db.atomic()
// Ensure this message was not yet processed
.check({ key: nonce.key, versionstamp: nonce.versionstamp })
.delete(nonce.key)
.sum(["processed_count"], 1n)
.check(bob, liz) // balances did not change
.set(["balance", "bob"], bob.value - change)
.set(["balance", "liz"], liz.value + change)
.commit();
});
const nonce = crypto.randomUUID();
await db
.atomic()
.check({ key: ["nonces", nonce], versionstamp: null })
.enqueue({ nonce, change: 10 })
.set(["nonces", nonce], true)
.sum(["enqueued_count"], 1n)
.commit();
此外,如果您的 listenQueue
处理程序抛出异常,运行时将自动重试再次调用该处理程序,直到它成功或达到最大重试次数。如果达到最大尝试次数(当前默认值为 5),则该消息将被丢弃。
用例和示例
队列通过允许服务器卸载异步进程和安排未来的工作,在扩展应用程序中非常有用。
以下是一些示例。
计划的电子邮件通知
有时,用户发起的工作或任务可能需要足够长的时间,以至于您不想让他们等待“任务完成”响应,或者不需要向他们发送响应。这时您可以将工作卸载到队列中,以保持您的服务器或应用程序对用户的响应速度。
以下是如何使用队列发送电子邮件通知的方法
const db = await Deno.openKv();
db.listenQueue(async (msg) => {
if (msg.type === "welcome_email") {
await sendWelcomeEmail(msg.customer_id);
} else if (msg.type === "survey_email") {
await sendSurveyEmail(msg.customer_id);
}
});
await db.enqueue(
{ type: "welcome_email", customer_id: 123 },
);
await db.enqueue(
{ type: "survey_email", customer_id: 123 },
{ delay: 259200000 }, // deliver in 3 days
);
可靠的 Webhook 处理
在 Web 上使用队列的另一个非常常见的例子是通过处理 webhook。这是一个使用 Oak 和队列异步处理 webhook 的示例
import { Application, Router } from "https://deno.land/x/[email protected]/mod.ts";
const db = await Deno.openKv();
db.listenQueue(async (msg) => {
await processWebHook(msg.webhook_body);
});
const router = new Router();
router.post("/webhook", async (ctx) => {
db.enqueue({ webhook_body: await ctx.request.body().value });
ctx.response.status = 200;
});
const app = new Application();
app.use(router.routes());
app.use(router.allowedMethods());
await app.listen({ port: 8000 });
Slack 提醒机器人
队列非常适合在 Discord 或 Slack 中构建机器人。
这是一个示例,展示了如何使用 Deno 队列在 Slack 中创建一个简单的提醒应用程序。
而这是一个 Discord 机器人,它使用 Deno 队列来创建赠品并允许用户一键加入。
更多示例
有关队列用法的更多示例,请访问 docs.deno.com。
Deno 队列的定价
当您探索队列的功能时,了解成本影响非常重要。队列本身没有特定的成本,而是根据 Deno KV 操作和 Deno Deploy 请求(用于监听)收费。具体来说:
入队消息:每个入队操作都转化为 KV 写入操作。
接收消息:每条接收到的消息都需要一次 KV 写入和一次请求费用。
这种透明的定价结构确保您仅为您使用的操作付费,这与我们对效率和简洁的承诺相符。
其他资源
下一步是什么
在 Web 上构建可扩展的应用程序和服务器需要将后台任务卸载到队列。但是,配置它们以供使用需要许多步骤。Deno 队列内置于运行时,并构建在 Deno Deploy 的强大基础设施之上,让您只需几行代码即可使用无服务器分布式队列。
Deno 队列与 Deno KV、Web 标准 API、npm 和 一体化现代工具 一起,成为使 Web 创建更简单、更高效的关键构建块。 离我们的目标还很远,我们还有许多令人兴奋的功能正在规划中。敬请关注。
我们始终乐于接受反馈和功能请求!欢迎加入我们不断壮大的 Discord 社区 或 在此处创建 issue。