跳至主要内容
Announcing Deno Queues

在不断发展的云软件世界中,Deno 旨在从根本上进行简化。传统上,利用公共云基础设施需要筛选大量的样板代码和复杂的配置,这通常会占用开发人员大量的时间和精力。我们的目标是将这些复杂性提炼成用户友好的原语,使开发人员能够以无与伦比的速度设计、改进和启动他们的项目。

观看我们的 Deno 队列发布公告。

考虑到这一点,我们在几个月前推出了 Deno KV目前处于公开测试阶段)。Deno KV 基于 FoundationDB 的强大功能,不仅仅是应用程序的一种新的持久化选项。它还通过消除冗余配置并提供令人耳目一新的简化 API 来转变开发人员的体验。

在这个基础之上(双关语),我们很高兴今天发布 Deno 队列。该工具旨在彻底改变可扩展消息传递,并提升应用程序中后台处理的管理水平。

const db = await Deno.openKv();

db.listenQueue(async (msg) => {
  await postToSlack(msg.channel, msg.text);
});

await db.enqueue({ channel: "C123456", text: "Slack message" }, {
  delay: 60000,
});

在这篇文章中,我们将介绍 Deno 队列的关键方面

什么是 Deno 队列?

Deno 队列基于 Deno KV 构建,允许您卸载应用程序的某些部分或安排未来的工作以异步运行,并提供两个新的简单 API,无需配置或维护基础设施

  • .enqueue():将新消息推送到队列中,以保证立即或在未来的某个时间交付。
  • .listenQueue():用于处理来自队列的新消息的处理程序。

由于队列基于 Deno KV 构建,因此在本地运行时使用 SQLite,在 Deno Deploy 上运行时使用 FoundationDB,以实现最大的可用性和吞吐量。

在 Deno Deploy 上运行队列针对性能进行了优化。Deno Deploy 会自动按需启动 V8 隔离,并在消息可用时分派消息以进行处理。您的应用程序代码只需使用 listenQueue 处理程序监听新消息,Deno Deploy 会处理其余的事情。

Deno 队列保证至少一次交付。 对于大多数入队的消息,listenQueue 处理程序将被调用一次。在某些故障情况下,处理程序可能会被多次调用以确保交付。重要的是要设计您的应用程序,以确保正确处理重复消息。

您还可以将 队列与 KV 原子事务 原语结合使用,这可以解锁强大的工作流程。例如,您可以将消息添加到队列中,作为 KV 事务的一部分,该事务会原子性地成功或失败

const kv = await Deno.openKv();
const change = 10;

const bob = await kv.get(["balance", "bob"]);
const liz = await kv.get(["balance", "liz"]);
if (bob.value < change) {
  throw "not enough balance";
}

const success = await kv.atomic()
  .check(bob, liz) // balances did not change
  .set(["balance", "bob"], bob.value - change)
  .set(["balance", "liz"], liz.value + change)
  // Enqueue a message to notify Liz and Bob
  .enqueue({ type: "notify", name: "liz", amount: change })
  .enqueue({ type: "notify", name: "bob", amount: -change })
  .commit();
将新消息入队作为原子事务的一部分 — 只有当整个事务成功时,它们才会被入队。

您还可以从 listenQueue 处理程序更新 Deno KV 状态。例如,如果您想确保每个消息的更新只执行一次,您也可以将队列 API 与 KV 原子事务一起使用

const db = await Deno.openKv();

db.listenQueue(async (msg) => {
  const nonce = await db.get(["nonces", msg.nonce]);
  if (nonce.value === null) {
    // This messaged was already processed.
    return;
  }

  const change = msg.change;
  const bob = await db.get(["balance", "bob"]);
  const liz = await db.get(["balance", "liz"]);

  const success = await db.atomic()
    // Ensure this message was not yet processed
    .check({ key: nonce.key, versionstamp: nonce.versionstamp })
    .delete(nonce.key)
    .sum(["processed_count"], 1n)
    .check(bob, liz) // balances did not change
    .set(["balance", "bob"], bob.value - change)
    .set(["balance", "liz"], liz.value + change)
    .commit();
});

const nonce = crypto.randomUUID();
await db
  .atomic()
  .check({ key: ["nonces", nonce], versionstamp: null })
  .enqueue({ nonce, change: 10 })
  .set(["nonces", nonce], true)
  .sum(["enqueued_count"], 1n)
  .commit();
此示例使用 KV 原子事务来确保每个消息仅更新一次。

此外,如果您的 listenQueue 处理程序抛出异常,运行时将自动重试再次调用该处理程序,直到它成功或达到最大重试次数。如果达到最大尝试次数(当前默认值为 5),则该消息将被丢弃。

用例和示例

队列通过允许服务器卸载异步进程和安排未来的工作,在扩展应用程序中非常有用。

以下是一些示例。

计划的电子邮件通知

有时,用户发起的工作或任务可能需要足够长的时间,以至于您不想让他们等待“任务完成”响应,或者不需要向他们发送响应。这时您可以将工作卸载到队列中,以保持您的服务器或应用程序对用户的响应速度。

以下是如何使用队列发送电子邮件通知的方法

const db = await Deno.openKv();

db.listenQueue(async (msg) => {
  if (msg.type === "welcome_email") {
    await sendWelcomeEmail(msg.customer_id);
  } else if (msg.type === "survey_email") {
    await sendSurveyEmail(msg.customer_id);
  }
});

await db.enqueue(
  { type: "welcome_email", customer_id: 123 },
);

await db.enqueue(
  { type: "survey_email", customer_id: 123 },
  { delay: 259200000 }, // deliver in 3 days
);

可靠的 Webhook 处理

在 Web 上使用队列的另一个非常常见的例子是通过处理 webhook。这是一个使用 Oak 和队列异步处理 webhook 的示例

import { Application, Router } from "https://deno.land/x/[email protected]/mod.ts";

const db = await Deno.openKv();

db.listenQueue(async (msg) => {
  await processWebHook(msg.webhook_body);
});

const router = new Router();
router.post("/webhook", async (ctx) => {
  db.enqueue({ webhook_body: await ctx.request.body().value });
  ctx.response.status = 200;
});

const app = new Application();
app.use(router.routes());
app.use(router.allowedMethods());

await app.listen({ port: 8000 });

Slack 提醒机器人

队列非常适合在 Discord 或 Slack 中构建机器人。

这是一个示例,展示了如何使用 Deno 队列在 Slack 中创建一个简单的提醒应用程序。

Receiving a reminder in Slack

这是一个 Discord 机器人,它使用 Deno 队列来创建赠品并允许用户一键加入。

Creating a giveaway in Discord

更多示例

有关队列用法的更多示例,请访问 docs.deno.com

Deno 队列的定价

当您探索队列的功能时,了解成本影响非常重要。队列本身没有特定的成本,而是根据 Deno KV 操作和 Deno Deploy 请求(用于监听)收费。具体来说:

入队消息:每个入队操作都转化为 KV 写入操作。

接收消息:每条接收到的消息都需要一次 KV 写入和一次请求费用。

这种透明的定价结构确保您仅为您使用的操作付费,这与我们对效率和简洁的承诺相符。

其他资源

下一步是什么

在 Web 上构建可扩展的应用程序和服务器需要将后台任务卸载到队列。但是,配置它们以供使用需要许多步骤。Deno 队列内置于运行时,并构建在 Deno Deploy 的强大基础设施之上,让您只需几行代码即可使用无服务器分布式队列。

Deno 队列与 Deno KVWeb 标准 APInpm一体化现代工具 一起,成为使 Web 创建更简单、更高效的关键构建块。 离我们的目标还很远,我们还有许多令人兴奋的功能正在规划中。敬请关注。

我们始终乐于接受反馈和功能请求!欢迎加入我们不断壮大的 Discord 社区在此处创建 issue