技術共有

Egg.jsはメッセージキューrabbitMQを使用します

2024-07-12

한어Русский языкEnglishFrançaisIndonesianSanskrit日本語DeutschPortuguêsΕλληνικάespañolItalianoSuomalainenLatina

1.egg-amqplib: RabbitMQ メッセージ キューのカプセル化に基づくライブラリ

インストール:

npm i egg-amqplib --save

導入

  1. // {app_root}/config/plugin.js
  2. exports.amqplib = {
  3. enable: true,
  4. package: 'egg-amqplib',
  5. };

設定

  1. // {app_root}/config/config.default.js
  2. exports.amqplib = {
  3. client: {
  4. // url: 'amqp://localhost',
  5. connectOptions: {
  6. protocol: 'amqp',
  7. hostname: 'localhost',
  8. port: 5672,
  9. username: 'guest',
  10. password: 'guest',
  11. locale: 'en_US',
  12. frameMax: 0,
  13. heartbeat: 0,
  14. vhost: '/',
  15. },
  16. // socketOptions: {
  17. // cert: certificateAsBuffer, // client cert
  18. // key: privateKeyAsBuffer, // client key
  19. // passphrase: 'MySecretPassword', // passphrase for key
  20. // ca: [caCertAsBuffer], // array of trusted CA certs
  21. // },
  22. },
  23. };

github を表示:https://github.com/zubinzhang/egg-amqplib

制御層:

  1. 'use strict';
  2. const Controller = require('egg').Controller;
  3. const queueName = 'test';
  4. class HomeController extends Controller {
  5. async publish() {
  6. const { msg } = this.ctx.query;
  7. const ch = await this.app.amqplib.createChannel();
  8. await ch.assertQueue(queueName, { durable: false });
  9. const ok = await ch.sendToQueue(queueName, Buffer.from(msg));
  10. await ch.close();
  11. this.ctx.body = ok;
  12. this.ctx.status = 200;
  13. }
  14. async consume() {
  15. const ch = await this.app.amqplib.createChannel();
  16. await ch.assertQueue(queueName, { durable: false });
  17. const msg = await new Promise(resolve => ch.consume(queueName, msg => resolve(msg)));
  18. if (msg !== null) {
  19. ch.ack(msg);
  20. await ch.close();
  21. this.ctx.status = 200;
  22. this.ctx.body = { msg: msg.content.toString() };
  23. } else {
  24. this.ctx.status = 500;
  25. }
  26. }
  27. }
  28. module.exports = HomeController;

ルーティング:

  1. 'use strict';
  2. module.exports = app => {
  3. const { router, controller } = app;
  4. router.get('/publish', controller.home.publish);
  5. router.get('/consume', controller.home.consume);
  6. };

参照する:egg-amqplib/test/fixtures/apps/amqplib-test/app/controller/home.js マスター · zubinzhang/egg-amqplib · GitHub

2. Rabbitmq のインストール docker を使用して RabbitMQ をインストールできます。

docker run --name rabbitmq -p 5672:567. -p 15672:15672 rabbitmq:3-management

アクセスアドレス:http://localhost:15672

デフォルトのアカウントパスワードは次のとおりです: guest : guest

管理者の作成:

3. キュー: 1 対 1

 

P はプロデューサーです > 中央のボックスはキューで、コンシューマーが保持するメッセージ バッファを表します > C はコンシューマーです

  1. 'use strict';
  2. const Controller = require('egg').Controller;
  3. /**
  4. * 一对一队列演示
  5. */
  6. // 频道名称
  7. const queueName = 'hasone'
  8. class UserController extends Controller {
  9. // 生成者
  10. async send() {
  11. // 1. 获取要发送的消息
  12. const { msg } = this.ctx.query
  13. // 2. 创建频道
  14. const ch = await this.app.amqplib.createChannel();
  15. // 3. 创建队列 durable 关闭持久化存储
  16. await ch.assertQueue(queueName, { durable: false } );
  17. // 4. 发送消息
  18. const ok = await ch.sendToQueue(queueName, Buffer.from(msg));
  19. // 5. 关闭连接
  20. await ch.close();
  21. this.ctx.body = ok;
  22. this.ctx.status = 200;
  23. }
  24. // 消费者
  25. async work() {
  26. // 1. 创建频道
  27. const ch = await this.app.amqplib.createChannel();
  28. // 2. 选择队列
  29. await ch.assertQueue(queueName, { durable: false });
  30. //3. 接收队列的消息
  31. const resultMsg = await new Promise(resolve => ch.consume(queueName, msg => resolve(msg), { noAck: true }));
  32. // 4. 显示消息内容
  33. if (resultMsg !== null) {
  34. ch.ack(resultMsg);
  35. await ch.close();
  36. const { content } = resultMsg;
  37. this.status = 200;
  38. this.ctx.body = { msg: content.toString() }
  39. } else {
  40. this.ctx.body = '队列消费失败'
  41. this.ctx.status = 500;
  42. }
  43. }
  44. }
  45. module.exports = UserController;

4. キュー: 1 対多

  1. 'use strict';
  2. const Controller = require('egg').Controller;
  3. /**
  4. * 队列一对多演示
  5. * 生产者 ----> 队列 ----> 消费者
  6. * ----> 消费者
  7. ----> 消费者
  8. */
  9. // 频道名称
  10. const queueName = 'hasMany'
  11. class UserController extends Controller {
  12. // 生成者
  13. async send() {
  14. const { msg } = this.ctx.query;
  15. //1. 创建频道
  16. const ch = await this.app.amqplib.createChannel();
  17. // 2. 创建队列 开启持久化存储
  18. await ch.assertQueue(queueName, { durable: true });
  19. // 3. 发送消息
  20. let ok = null;
  21. for(let i=0; i<50; i++) {
  22. // 此时我们确信即使RabbitMQ重新启动,task_queue队列也不会丢失。现在我们需要将消息标记为持久性 - 通过使用持久性选项Channel.sendToQueue。
  23. ok = await ch.sendToQueue(queueName, Buffer.from(msg+i), { persistent: true });
  24. }
  25. //4. 关闭连接
  26. await ch.close();
  27. this.ctx.body = ok;
  28. this.ctx.status = 200;
  29. }
  30. // 消费者
  31. async work1() {
  32. // 1. 创建频道
  33. const ch = await this.app.amqplib.createChannel();
  34. //2. 选择队列
  35. await ch.assertQueue(queueName, { durable: true });
  36. // 3. 接收消息 noAck 关闭消息自动确认模式,需要手动 ack
  37. const resultMsg = await new Promise(resolve => ch.consume(queueName, msg => {
  38. setTimeout(() => {
  39. resolve(msg)
  40. }, 500)
  41. }, { noAck: false }) );
  42. if (resultMsg !== null) {
  43. const { content } = resultMsg;
  44. //消费者发回ack(nowledgement)告诉RabbitMQ已收到,处理了特定消息,RabbitMQ可以自由删除它
  45. ch.ack(resultMsg);
  46. await ch.close();
  47. this.ctx.body = { work1: content.toString() };
  48. this.ctx.status = 200;
  49. } else {
  50. this.ctx.body = '消费者1号失败'
  51. this.ctx.status = 500
  52. }
  53. }
  54. async work2() {
  55. // 1. 创建频道
  56. const ch = await this.app.amqplib.createChannel();
  57. //2. 选择队列 RabbitMQ永远不会丢失我们的队列。为此,我们需要声明它是持久的
  58. await ch.assertQueue(queueName, { durable: true });
  59. // 3. 接收消息 noAck 开启自动确认模式
  60. const resultMsg = await new Promise(resolve => ch.consume(queueName, msg => {
  61. setTimeout(() => {
  62. resolve(msg)
  63. }, 1000)
  64. }, { noAck: false }) );
  65. if (resultMsg !== null) {
  66. const { content } = resultMsg;
  67. ch.ack(resultMsg);
  68. await ch.close();
  69. this.ctx.body = { work2: content.toString() };
  70. this.ctx.status = 200;
  71. } else {
  72. this.ctx.body = '消费者2号失败'
  73. this.ctx.status = 500
  74. }
  75. }
  76. async work3() {
  77. // 1. 创建频道
  78. const ch = await this.app.amqplib.createChannel();
  79. //2. 选择队列
  80. await ch.assertQueue(queueName, { durable: true });
  81. // 3. 接收消息 noAck 开启自动确认模式
  82. const resultMsg = await new Promise(resolve => ch.consume(queueName, msg => {
  83. setTimeout(() => {
  84. resolve(msg)
  85. }, 1500)
  86. }, { noAck: false }) );
  87. if (resultMsg !== null) {
  88. const { content } = resultMsg;
  89. //消费者发回ack(nowledgement)告诉RabbitMQ已收到,处理了特定消息,RabbitMQ可以自由删除它
  90. ch.ack(resultMsg);
  91. await ch.close();
  92. this.ctx.body = { work3: content.toString() };
  93. this.ctx.status = 200;
  94. } else {
  95. this.ctx.body = '消费者3号失败'
  96. this.ctx.status = 500
  97. }
  98. }
  99. }
  100. module.exports = UserController;