cool hit counter The right way to open task queues and asynchronous interfaces (. NETCore version)_Intefrankly

The right way to open task queues and asynchronous interfaces (. NETCore version)


The right way to open task queues and asynchronous interfaces

What is an asynchronous interface?

<h2 id="asynchronous-operations">Asynchronous Operations</h2>

Certain types of operations might require processing of the request in an asynchronous manner (e.g. validating a bank account, processing an image, etc.) in order to avoid long delays on the client side and prevent long-standing open client connections waiting for the operations to complete. For such use cases, APIs MUST employ the following pattern:

For POST requests:

  • Return the 202 Accepted HTTP response code.
  • In the response body, include one or more URIs as hypermedia links, which could include:
    • The final URI of the resource where it will be available in future if the ID and path are already known. Clients can then make an HTTP GET request to that URI in order to obtain the completed resource. Until the resource is ready, the final URI SHOULD return the HTTP status code 404 Not Found.
`{ "rel": "self", "href": "/v1/namespace/resources/{resource_id}", "method": "GET" }`
* A temporary request queue URI where the status of the operation may be obtained via some temporary identifier. Clients SHOULD make an HTTP `GET` request to obtain the status of the operation which MAY include such information as completion state, ETA, and final URI once it is completed.
`{ "rel": "self", "href": "/v1/queue/requests/{request_id}, "method": "GET" }"`

For PUT/PATCH/DELETE/GET requests:

Like POST, you can support PUT/PATCH/DELETE/GET to be asynchronous. The behaviour would be as follows:

  • Return the 202 Accepted HTTP response code.
  • In the response body, include one or more URIs as hypermedia links, which could include: * A temporary request queue URI where the status of the operation may be obtained via some temporary identifier. Clients SHOULD make an HTTP GET request to obtain the status of the operation which MAY include such information as completion state, ETA, and final URI once it is completed.
`{ "rel": "self", "href": "/v1/queue/requests/{request_id}, "method": "GET" }"`

APIs that support both synchronous and asynchronous processing for an URI:

APIs that support both synchronous and asynchronous operations for a particular URI and an HTTP method combination, MUST recognize the Prefer header and exhibit following behavior:

  • If the request contains a Prefer=respond-async header, the service MUST switch the processing to asynchronous mode.
  • If the request doesn't contain a Prefer=respond-async header, the service MUST process the request synchronously.

It is desirable that all APIs that implement asynchronous processing, also support webhooks as a mechanism of pushing the processing status to the client.

Information cited from:paypal/API Design Patterns And Use Cases:asynchronous-operations

in human terms

  • Simply put, the request comes, directly returns the corresponding resourceId/request_id, and then you can query the processing results through the resourceId/request_id
  • The process may be queued, or it may be directly asynchronous
  • If the processing has not been completed, return 404, if the processing is complete, the normal return of the corresponding data

It doesn't seem to say much more ....

Let's finish the whole thing.

Sample code section!

Implementation logic

  • Creating tasks, generate"request-id" Store to the correspondingredis zset queue
  • Also send a task message to the redis channel, and the backend task processing service processes the message itself (producer-consumer mode)
  • After the task processing service has processed the message, Write the processing results to theredis,request-id because ofkey, end up withvalue, Then from the point of view ofredis zset Remove the corresponding"request-id"
  • Get the result of request-id processing: if the request-id can query the corresponding task processing results, directly return the processed data; If the request-id is still in the sortset queue, it will return 404 + the corresponding position n, indicating that it is still being processed, and there are n requests ahead;

The timing diagram looks something like this:

Pleasure code time

RequestService.cs

// RequestService.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using CorrelationId;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using StackExchange.Redis;
using static StackExchange.Redis.RedisChannel;

namespace MTQueue.Service
{
    public class RequestService
    {


        private readonly ICorrelationContextAccessor _correlationContext;

        private readonly ConnectionMultiplexer _redisMultiplexer;

        private readonly IServiceProvider _services;

        private readonly ILogger<RequestService> _logger;

        public RequestService(ICorrelationContextAccessor correlationContext,
        ConnectionMultiplexer redisMultiplexer, IServiceProvider services,
        ILogger<RequestService> logger)
        {
            _correlationContext = correlationContext;
            _redisMultiplexer = redisMultiplexer;
            _services = services;
            _logger = logger;
        }

        public long? AddRequest(JToken data)
        {
            var requestId = _correlationContext.CorrelationContext.CorrelationId;
            var redisDB = _redisMultiplexer.GetDatabase(CommonConst.DEFAULT_DB);
            var index = redisDB.SortedSetRank(CommonConst.REQUESTS_SORT_SETKEY, requestId);
            if (index == null)
            {
                data["requestId"] = requestId;
                redisDB.SortedSetAdd(CommonConst.REQUESTS_SORT_SETKEY, requestId, GetTotalSeconds());
                PushRedisMessage(data.ToString());
            }
            return redisDB.SortedSetRank(CommonConst.REQUESTS_SORT_SETKEY, requestId);
        }

        public static long GetTotalSeconds()
        {
            return (long)(DateTime.Now.ToLocalTime() - new DateTime(1970, 1, 1).ToLocalTime()).TotalSeconds;
        }

        private void PushRedisMessage(string message)
        {
            Task.Run(() =>
            {
                try
                {
                    using (var scope = _services.CreateScope())
                    {
                        var multiplexer = scope.ServiceProvider.GetRequiredService<ConnectionMultiplexer>();
                        multiplexer.GetSubscriber().PublishAsync(CommonConst.REQUEST_CHANNEL, message);
                    }
                }
                catch (Exception ex)
                {
                    _logger.LogError(-1, ex, message);
                }
            });
        }

        public Tuple<JToken, long?> GetRequest(string requestId)
        {
            var redisDB = _redisMultiplexer.GetDatabase(CommonConst.DEFAULT_DB);
            var keyIndex = redisDB.SortedSetRank(CommonConst.REQUESTS_SORT_SETKEY, requestId);
            var response = redisDB.StringGet(requestId);
            if (response.IsNull)
            {
                return Tuple.Create<JToken, long?>(default(JToken), keyIndex);
            }
            return Tuple.Create<JToken, long?>(JToken.Parse(response), keyIndex);
        }

    }
}
// RedisMQListener.cs

using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MTQueue.Model;
using MTQueue.Service;
using Newtonsoft.Json.Linq;
using StackExchange.Redis;
using static StackExchange.Redis.RedisChannel;

namespace MTQueue.Listener
{
    public class RedisMQListener : IHostedService
    {
        private readonly ConnectionMultiplexer _redisMultiplexer;

        private readonly IServiceProvider _services;

        private readonly ILogger<RedisMQListener> _logger;

        public RedisMQListener(IServiceProvider services, ConnectionMultiplexer redisMultiplexer,
        ILogger<RedisMQListener> logger)
        {
            _services = services;
            _redisMultiplexer = redisMultiplexer;
            _logger = logger;
        }

        public Task StartAsync(CancellationToken cancellationToken)
        {
            Register();
            return Task.CompletedTask;
        }


        public virtual bool Process(RedisChannel ch, RedisValue message)
        {
            _logger.LogInformation("Process start,message: " + message);
            var redisDB = _services.GetRequiredService<ConnectionMultiplexer>()
            .GetDatabase(CommonConst.DEFAULT_DB);
            var messageJson = JToken.Parse(message);
            var requestId = messageJson["requestId"]?.ToString();
            if (string.IsNullOrEmpty(requestId))
            {
                _logger.LogWarning("requestId not in message.");
                return false;
            }
            var mtAgent = _services.GetRequiredService<ZhihuClient>();
            var text = mtAgent.GetZhuanlan(messageJson);
            redisDB.StringSet(requestId, text.ToString(), CommonConst.RESPONSE_TS);
            _logger.LogInformation("Process finish,requestId:" + requestId);
            redisDB.SortedSetRemove(CommonConst.REQUESTS_SORT_SETKEY, requestId);
            return true;
        }


        public void Register()
        {
            var sub = _redisMultiplexer.GetSubscriber();
            var channel = CommonConst.REQUEST_CHANNEL;
            sub.SubscribeAsync(channel, (ch, value) =>
            {
                Process(ch, value);
            });
        }

        public void DeRegister()
        {
            // this.connection.Close();
        }


        public Task StopAsync(CancellationToken cancellationToken)
        {
            // this.connection.Close();
            return Task.CompletedTask;
        }
    }

}
// RequestsController.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using CorrelationId;
using Microsoft.AspNetCore.Mvc;
using MTQueue.Service;
using Newtonsoft.Json.Linq;

namespace MTQueue.Controllers
{
    [Route("v1/[controller]")]
    [ApiController]
    public class RequestsController : ControllerBase
    {

        private readonly ICorrelationContextAccessor _correlationContext;

        private readonly RequestService _requestService;

        private readonly ZhihuClient _mtAgentClient;

        public RequestsController(ICorrelationContextAccessor correlationContext,
         RequestService requestService, ZhihuClient mtAgentClient)
        {
            _correlationContext = correlationContext;
            _requestService = requestService;
            _mtAgentClient = mtAgentClient;
        }



        [HttpGet("{requestId}")]
        public IActionResult Get(string requestId)
        {
            var result = _requestService.GetRequest(requestId);
            var resource = $"/v1/requests/{requestId}";
            if (result.Item1 == default(JToken))
            {
                return NotFound(new { rel = "self", href = resource, method = "GET", index = result.Item2 });
            }
            return Ok(result.Item1);
        }

        [HttpPost]
        public IActionResult Post([FromBody] JToken data, [FromHeader(Name = "Prefer")]string prefer)
        {
            if (!string.IsNullOrEmpty(prefer) && prefer == "respond-async")
            {
                var index = _requestService.AddRequest(data);
                var requestId = _correlationContext.CorrelationContext.CorrelationId;
                var resource = $"/v1/requests/{requestId}";
                return Accepted(resource, new { rel = "self", href = resource, method = "GET", index = index });
            }
            return Ok(_mtAgentClient.GetZhuanlan(data));
        }
    }
}

For the full code see :https://github.com/liguobao/TaskQueueSample


Recommended>>
1、Latest China intelligent combined with dfki artificial intelligence seminar center artificial intelligence intelligent manufacturing
2、360 Internet Security Center 2017 Annual ITOT Integrated Industrial Information Security Posture Report
3、Waymos selfdriving commercialization kicks off with four major areas of preemption
4、Huawei VPNGRE Configuration Sharing
5、Finally find Hebei push down hu external hanging auxiliaryThe bullfighting bull fried gold flower external hanging software

    已推荐到看一看 和朋友分享想法
    最多200字,当前共 发送

    已发送

    朋友将在看一看看到

    确定
    分享你的想法...
    取消

    分享想法到看一看

    确定
    最多200字,当前共

    发送中

    网络异常,请稍后重试

    微信扫一扫
    关注该公众号