跳至主要内容
Deno 2 终于发布了 🎉️
了解更多
Announcing Deno Queues

Deno 队列发布公告

在不断变化的云软件世界中,Deno 旨在彻底简化。利用公共云基础设施传统上需要筛选大量样板代码和复杂的配置,这往往会占用开发人员的大量时间和精力。我们的目标是将这些复杂性提炼成用户友好的基本元素,使开发人员能够以无与伦比的速度设计、改进和启动他们的项目。

观看我们的 Deno 队列发布公告。

考虑到这一点,我们几个月前推出了 Deno KV目前处于公开测试阶段)。Deno KV 建立在 FoundationDB 的强大功能之上,不仅仅是应用程序的新持久性选项。它关乎通过消除冗余配置并提供令人耳目一新的简化 API 来改变开发人员体验。

基于此基础(双关语),我们很高兴今天推出 Deno 队列。此工具将彻底改变可扩展消息传递,并提升应用程序中后台处理的管理。

const db = await Deno.openKv();

db.listenQueue(async (msg) => {
  await postToSlack(msg.channel, msg.text);
});

await db.enqueue({ channel: "C123456", text: "Slack message" }, {
  delay: 60000,
});

在这篇文章中,我们将介绍 Deno 队列的关键方面

什么是 Deno 队列?

Deno 队列建立在 Deno KV 之上,使您可以卸载应用程序的某些部分,或安排未来的工作异步运行,并使用两个新的简单 API,无需任何配置或基础设施维护。

  • .enqueue():将新消息推送到队列中,以立即或在将来的某个时间进行保证的传递。
  • .listenQueue():用于处理来自队列的新消息的处理程序。

由于队列建立在 Deno KV 之上,因此它在本地运行时使用 SQLite,在 Deno Deploy 上运行时使用 FoundationDB,以实现最大可用性和吞吐量。

在 Deno Deploy 上运行队列经过优化,以提高性能。Deno Deploy 会按需自动启动 V8 隔离,并在有可处理的消息时调度消息。您的应用程序代码只需使用 listenQueue 处理程序监听新消息,Deno Deploy 会处理其余工作。

Deno 队列保证至少一次传递。对于大多数排队的消息,listenQueue 处理程序将被调用一次。在某些故障情况下,处理程序可能会被调用多次,以确保传递。重要的是要设计您的应用程序以确保正确处理重复消息。

您还可以将 队列与 KV 原子事务基本元素结合起来,这可以解锁强大的工作流程。例如,您可以在 KV 事务的一部分中添加消息到队列,该事务会以原子方式成功或失败。

const kv = await Deno.openKv();
const change = 10;

const bob = await kv.get(["balance", "bob"]);
const liz = await kv.get(["balance", "liz"]);
if (bob.value < change) {
  throw "not enough balance";
}

const success = await kv.atomic()
  .check(bob, liz) // balances did not change
  .set(["balance", "bob"], bob.value - change)
  .set(["balance", "liz"], liz.value + change)
  // Enqueue a message to notify Liz and Bob
  .enqueue({ type: "notify", name: "liz", amount: change })
  .enqueue({ type: "notify", name: "bob", amount: -change })
  .commit();
在原子事务的一部分中排队新消息——只有当整个事务成功时,才会将它们排队。

您还可以从 listenQueue 处理程序更新 Deno KV 状态。例如,如果您希望确保仅对每条消息执行一次更新,您也可以使用队列 API 与 KV 原子事务。

const db = await Deno.openKv();

db.listenQueue(async (msg) => {
  const nonce = await db.get(["nonces", msg.nonce]);
  if (nonce.value === null) {
    // This messaged was already processed.
    return;
  }

  const change = msg.change;
  const bob = await db.get(["balance", "bob"]);
  const liz = await db.get(["balance", "liz"]);

  const success = await db.atomic()
    // Ensure this message was not yet processed
    .check({ key: nonce.key, versionstamp: nonce.versionstamp })
    .delete(nonce.key)
    .sum(["processed_count"], 1n)
    .check(bob, liz) // balances did not change
    .set(["balance", "bob"], bob.value - change)
    .set(["balance", "liz"], liz.value + change)
    .commit();
});

const nonce = crypto.randomUUID();
await db
  .atomic()
  .check({ key: ["nonces", nonce], versionstamp: null })
  .enqueue({ nonce, change: 10 })
  .set(["nonces", nonce], true)
  .sum(["enqueued_count"], 1n)
  .commit();
此示例使用 KV 原子事务确保每条消息仅更新一次。

此外,如果您的 listenQueue 处理程序抛出异常,运行时将自动尝试再次调用处理程序,直到它成功或达到最大重试次数。如果达到最大尝试次数(当前默认值为 5),则会丢弃消息。

用例和示例

队列通过允许服务器卸载异步进程和安排未来的工作来帮助扩展应用程序。

以下是一些示例。

计划的电子邮件通知

有时,由用户启动的作业或任务可能需要足够的时间,以至于您不想让他们等待“任务完成”响应,或者不需要向他们发送响应。此时,您可以将工作卸载到队列,以保持服务器或应用程序对用户的响应能力。

以下是如何使用队列发送电子邮件通知

const db = await Deno.openKv();

db.listenQueue(async (msg) => {
  if (msg.type === "welcome_email") {
    await sendWelcomeEmail(msg.customer_id);
  } else if (msg.type === "survey_email") {
    await sendSurveyEmail(msg.customer_id);
  }
});

await db.enqueue(
  { type: "welcome_email", customer_id: 123 },
);

await db.enqueue(
  { type: "survey_email", customer_id: 123 },
  { delay: 259200000 }, // deliver in 3 days
);

可靠的 Webhook 处理

在网络上使用队列的另一个非常常见的示例是通过处理 Webhook。以下是如何使用 Oak 和队列异步处理 Webhook 的示例

import { Application, Router } from "https://deno.land/x/[email protected]/mod.ts";

const db = await Deno.openKv();

db.listenQueue(async (msg) => {
  await processWebHook(msg.webhook_body);
});

const router = new Router();
router.post("/webhook", async (ctx) => {
  db.enqueue({ webhook_body: await ctx.request.body().value });
  ctx.response.status = 200;
});

const app = new Application();
app.use(router.routes());
app.use(router.allowedMethods());

await app.listen({ port: 8000 });

Slack 提醒机器人

队列非常适合在 Discord 或 Slack 中构建机器人。

这是一个示例,它使用 Deno 队列在 Slack 中创建一个简单的提醒应用程序。

Receiving a reminder in Slack

并且 这是一个 Discord 机器人,它使用 Deno 队列来创建赠品,并允许用户只需单击一下即可加入。

Creating a giveaway in Discord

更多示例

可以在 docs.deno.com 上找到更多关于队列用法的示例。

Deno 队列的定价

在探索队列的功能时,了解成本影响非常重要。队列本身没有特定的成本,而是按照 Deno KV 操作和 Deno Deploy 请求(用于监听)来收费。具体来说

排队消息:每个排队操作都会转换为一个 KV 写入操作。

接收消息:每条接收的消息都会涉及一次 KV 写入,以及一次单独的请求费用。

这种透明的定价结构确保您只为使用的操作付费,这符合我们对效率和简洁性的承诺。

其他资源

下一步

在网络上构建可扩展的应用程序和服务器需要将后台任务卸载到队列中。然而,在配置它们以供使用时,需要很多步骤。Deno Queues 内置于运行时,并建立在 Deno Deploy 的强大基础设施之上,使您只需几行代码即可使用无服务器、分布式队列。

Deno Queues 与 Deno KVWeb 标准 APInpm一体化现代工具 相结合,成为构建 Web 应用程序的关键基石,使开发更简单、更高效。我们距离目标还有很长的路要走,路线图上还有更多令人兴奋的功能。敬请期待。

我们始终欢迎您的反馈和功能请求!请随时 加入我们不断壮大的 Discord 频道在此创建问题