跳到主要内容
Deno 2.4 发布,带来了 deno bundle、bytes/text 导入、稳定的 OTel 等功能
了解更多
Announcing Deno Queues

在不断发展的云软件世界中,Deno 旨在彻底简化开发。利用公共云基础设施传统上需要筛选大量的样板代码和复杂的配置,这通常会占用开发人员大量的时间和精力。我们的目标是将这些复杂性提炼为用户友好的原语,使开发人员能够以无与伦比的速度设计、完善和发布他们的项目。

观看我们的 Deno Queues 发布公告。

考虑到这一点,我们几个月前推出了 Deno KV目前处于公开测试阶段)。Deno KV 基于 FoundationDB 强大的功能构建,它不仅仅是应用程序的一种新的持久化选项。它旨在通过消除冗余配置和提供耳目一新的简化 API 来改变开发人员的体验。

在此基础之上(双关语),我们今天很高兴推出 Deno Queues。这个工具将彻底改变可伸缩消息传递,并提升应用程序中后台处理的管理。

const db = await Deno.openKv();

db.listenQueue(async (msg) => {
  await postToSlack(msg.channel, msg.text);
});

await db.enqueue({ channel: "C123456", text: "Slack message" }, {
  delay: 60000,
});

在这篇文章中,我们将介绍 Deno Queues 的主要方面

什么是 Deno Queues?

Deno Queues 基于 Deno KV 构建,允许您卸载应用程序的部分功能或安排未来异步运行的工作,提供了两个新的简单 API,无需任何配置或基础设施维护。

  • .enqueue():将新消息推送到队列中,以确保立即或在未来某个时间送达。
  • .listenQueue():用于处理来自队列的新消息的处理程序。

由于 Queues 是基于 Deno KV 构建的,它在本地运行时使用 SQLite,在 Deno Deploy 上运行时使用 FoundationDB,以实现最大的可用性和吞吐量。

在 Deno Deploy 上运行 Queues 已针对性能进行优化。Deno Deploy 会按需自动启动 V8 隔离环境,并在消息可用时进行分派处理。您的应用程序代码只需使用 listenQueue 处理程序监听新消息,Deno Deploy 会处理其余部分。

Deno Queues 保证至少一次交付。 对于大多数入队消息,listenQueue 处理程序将只被调用一次。在某些故障情况下,处理程序可能会被多次调用以确保交付。重要的是要设计您的应用程序以确保正确处理重复消息。

您还可以将 Queues 与 KV 原子事务原语结合使用,这将解锁强大的工作流。例如,您可以将消息作为 KV 事务的一部分添加到队列中,该事务将原子性地成功或失败

const kv = await Deno.openKv();
const change = 10;

const bob = await kv.get(["balance", "bob"]);
const liz = await kv.get(["balance", "liz"]);
if (bob.value < change) {
  throw "not enough balance";
}

const success = await kv.atomic()
  .check(bob, liz) // balances did not change
  .set(["balance", "bob"], bob.value - change)
  .set(["balance", "liz"], liz.value + change)
  // Enqueue a message to notify Liz and Bob
  .enqueue({ type: "notify", name: "liz", amount: change })
  .enqueue({ type: "notify", name: "bob", amount: -change })
  .commit();
作为原子事务的一部分将新消息入队——只有当整个事务成功时,它们才会被入队。

您还可以从 listenQueue 处理程序更新 Deno KV 状态。例如,如果您想确保每个消息的更新只执行一次,您也可以将 Queue API 与 KV 原子事务结合使用

const db = await Deno.openKv();

db.listenQueue(async (msg) => {
  const nonce = await db.get(["nonces", msg.nonce]);
  if (nonce.value === null) {
    // This messaged was already processed.
    return;
  }

  const change = msg.change;
  const bob = await db.get(["balance", "bob"]);
  const liz = await db.get(["balance", "liz"]);

  const success = await db.atomic()
    // Ensure this message was not yet processed
    .check({ key: nonce.key, versionstamp: nonce.versionstamp })
    .delete(nonce.key)
    .sum(["processed_count"], 1n)
    .check(bob, liz) // balances did not change
    .set(["balance", "bob"], bob.value - change)
    .set(["balance", "liz"], liz.value + change)
    .commit();
});

const nonce = crypto.randomUUID();
await db
  .atomic()
  .check({ key: ["nonces", nonce], versionstamp: null })
  .enqueue({ nonce, change: 10 })
  .set(["nonces", nonce], true)
  .sum(["enqueued_count"], 1n)
  .commit();
此示例使用 KV 原子事务来确保每个消息只更新一次。

此外,如果您的 listenQueue 处理程序抛出异常,运行时将自动重试调用该处理程序,直到成功或达到最大重试次数。如果达到最大尝试次数(当前默认为 5 次),消息将被丢弃。

用例和示例

队列通过允许服务器卸载异步进程和安排未来的工作,在扩展应用程序方面非常有用。

以下是一些示例。

定时邮件通知

有时,用户发起的一项任务可能需要较长时间,您不希望让用户等待“任务完成”响应,或者根本不需要发送响应。这时您可以将工作卸载到队列中,以保持服务器或应用程序对用户保持响应。

以下是如何使用队列发送电子邮件通知的方法

const db = await Deno.openKv();

db.listenQueue(async (msg) => {
  if (msg.type === "welcome_email") {
    await sendWelcomeEmail(msg.customer_id);
  } else if (msg.type === "survey_email") {
    await sendSurveyEmail(msg.customer_id);
  }
});

await db.enqueue(
  { type: "welcome_email", customer_id: 123 },
);

await db.enqueue(
  { type: "survey_email", customer_id: 123 },
  { delay: 259200000 }, // deliver in 3 days
);

可靠的 Webhook 处理

在 Web 上使用队列的另一个非常常见的例子是处理 Webhook。以下是一个使用 Oak 和 Queues 异步处理 Webhook 的示例

import { Application, Router } from "https://deno.land/x/oak@v12.6.1/mod.ts";

const db = await Deno.openKv();

db.listenQueue(async (msg) => {
  await processWebHook(msg.webhook_body);
});

const router = new Router();
router.post("/webhook", async (ctx) => {
  db.enqueue({ webhook_body: await ctx.request.body().value });
  ctx.response.status = 200;
});

const app = new Application();
app.use(router.routes());
app.use(router.allowedMethods());

await app.listen({ port: 8000 });

Slack 提醒机器人

队列非常适合在 Discord 或 Slack 中构建机器人。

这里有一个示例,展示了如何使用 Deno Queues 在 Slack 中创建一个简单的提醒应用程序。

Receiving a reminder in Slack

这是 一个 Discord 机器人,它使用 Deno Queues 创建赠品并允许用户一键加入。

Creating a giveaway in Discord

更多示例

更多队列使用示例可在 docs.deno.com 找到。

Deno Queues 定价

当您探索 Queues 的功能时,了解其成本影响至关重要。Queues 本身没有特定的成本,而是根据 Deno KV 操作和 Deno Deploy 请求(用于监听)进行计费。具体来说,

消息入队:每个入队操作都会转换为一次 KV 写入操作。

接收消息:每条接收到的消息都涉及一次 KV 写入和一次请求费用。

这种透明的定价结构确保您只为使用的操作付费,符合我们对效率和简单性的承诺。

其他资源

下一步是什么

在 Web 上构建可伸缩应用程序和服务器需要将后台任务卸载到队列中。然而,配置它们需要许多步骤。Deno Queues 内置于运行时中,并建立在 Deno Deploy 的强大基础设施之上,让您只需几行代码即可使用无服务器的分布式队列。

Deno Queues 与 Deno KVWeb 标准 APInpm 以及 一体化的现代工具一起,成为使 Web 开发更简单、更高效的关键构建块。我们距离目标还有很长的路要走,路线图上还有许多令人兴奋的功能。敬请关注。

我们始终欢迎反馈和功能请求!请随意 加入我们不断壮大的 Discord在此创建问题