秋霞步兵区国产精品,国产精品视频二区第二页,亚洲aⅴ欧美综合一区二区三区,亚洲日韩欧美一区二区不卡

      1. <small id="x8tpb"></small>
        <address id="x8tpb"></address>

        新疆信息港歡迎您!

        新疆信息港
        新疆信息港 > 財(cái)經(jīng) >ASP.NET Core 3.x 并發(fā)限制!

        ASP.NET Core 3.x 并發(fā)限制!

        2020-11-20 04:44:16
        來(lái)源:互聯(lián)網(wǎng)
        閱讀:-

        前言Microsoft.AspNetCore.ConcurrencyLimiter AspNetCore3.0后增加的,用于傳入的請(qǐng)求進(jìn)行排隊(duì)處理,避免線程池的不足....

        前言

        Microsoft.AspNetCore.ConcurrencyLimiter AspNetCore3.0后增加的,用于傳入的請(qǐng)求進(jìn)行排隊(duì)處理,避免線程池的不足.
        我們?nèi)粘i_(kāi)發(fā)中可能常做的給某web服務(wù)器配置連接數(shù)以及,請(qǐng)求隊(duì)列大小,那么今天我們看看如何在通過(guò)中間件形式實(shí)現(xiàn)一個(gè)并發(fā)量以及隊(duì)列長(zhǎng)度限制.

        Queue策略

        添加Nuget

        Install-Package Microsoft.AspNetCore.ConcurrencyLimiter

        Copy public void ConfigureServices(IServiceCollection services)
        {
        services.AddQueuePolicy(options =>
        {
        //最大并發(fā)請(qǐng)求數(shù)
        options.MaxConcurrentRequests = 2;
        //請(qǐng)求隊(duì)列長(zhǎng)度限制
        options.RequestQueueLimit = 1;
        });
        services.AddControllers();
        }
        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
        //添加并發(fā)限制中間件
        app.UseConcurrencyLimiter();
        app.Run(async context =>
        {
        Task.Delay(100).Wait(); // 100ms sync-over-async

        await context.Response.WriteAsync("Hello World!");
        });
        if (env.IsDevelopment())
        {
        app.UseDeveloperExceptionPage();
        }

        app.UseHttpsRedirection();

        app.UseRouting();

        app.UseAuthorization();

        app.UseEndpoints(endpoints =>
        {
        endpoints.MapControllers();
        });
        }

        通過(guò)上面簡(jiǎn)單的配置,我們就可以將他引入到我們的代碼中,從而做并發(fā)量限制,以及隊(duì)列的長(zhǎng)度;那么問(wèn)題來(lái)了,他是怎么實(shí)現(xiàn)的呢?

        Copy public static IServiceCollection AddQueuePolicy(this IServiceCollection services, Action configure){ services.Configure(configure); services.AddSingleton();
        return services;
        }

        QueuePolicy采用的是SemaphoreSlim信號(hào)量設(shè)計(jì),SemaphoreSlim、Semaphore(信號(hào)量)支持并發(fā)多線程進(jìn)入被保護(hù)代碼,對(duì)象在初始化時(shí)會(huì)指定 最大任務(wù)數(shù)量,當(dāng)線程請(qǐng)求訪問(wèn)資源,信號(hào)量遞減,而當(dāng)他們釋放時(shí),信號(hào)量計(jì)數(shù)又遞增。

        Copy ///  /// 構(gòu)造方法(初始化Queue策略) ///  ///  public QueuePolicy(IOptions options) { _maxConcurrentRequests = options.Value.MaxConcurrentRequests; if (_maxConcurrentRequests <= 0) { throw new ArgumentException(nameof(_maxConcurrentRequests), "MaxConcurrentRequests must be a positive integer."); } _requestQueueLimit = options.Value.RequestQueueLimit; if (_requestQueueLimit < 0)
        {
        throw new ArgumentException(nameof(_requestQueueLimit), "The RequestQueueLimit cannot be a negative number.");
        }
        //使用SemaphoreSlim來(lái)限制任務(wù)最大個(gè)數(shù)
        _serverSemaphore = new SemaphoreSlim(_maxConcurrentRequests);
        }

        ConcurrencyLimiterMiddleware中間件

        Copy ///  /// Invokes the logic of the middleware. ///  /// The . /// A  that completes when the request leaves.
        public async Task Invoke(HttpContext context)
        {
        var waitInQueueTask = _queuePolicy.TryEnterAsync();

        // Make sure we only ever call GetResult once on the TryEnterAsync ValueTask b/c it resets.
        bool result;

        if (waitInQueueTask.IsCompleted)
        {
        ConcurrencyLimiterEventSource.Log.QueueSkipped();
        result = waitInQueueTask.Result;
        }
        else
        {
        using (ConcurrencyLimiterEventSource.Log.QueueTimer())
        {
        result = await waitInQueueTask;
        }
        }

        if (result)
        {
        try
        {
        await _next(context);
        }
        finally
        {
        _queuePolicy.OnExit();
        }
        }
        else
        {
        ConcurrencyLimiterEventSource.Log.RequestRejected();
        ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
        context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
        await _onRejected(context);
        }
        }

        每次當(dāng)我們請(qǐng)求的時(shí)候首先會(huì)調(diào)用_queuePolicy.TryEnterAsync(),進(jìn)入該方法后先開(kāi)啟一個(gè)私有l(wèi)ock鎖,再接著判斷總請(qǐng)求量是否≥(請(qǐng)求隊(duì)列限制的大小+最大并發(fā)請(qǐng)求數(shù)),如果當(dāng)前數(shù)量超出了,那么我直接拋出,送你個(gè)503狀態(tài);

        Copy if (result)
        {
        try
        {
        await _next(context);
        }
        finally
        {
        _queuePolicy.OnExit();
        }
        }
        else
        {
        ConcurrencyLimiterEventSource.Log.RequestRejected();
        ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
        context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
        await _onRejected(context);
        }

        問(wèn)題來(lái)了,我這邊如果說(shuō)還沒(méi)到你設(shè)置的大小呢,我這個(gè)請(qǐng)求沒(méi)有給你服務(wù)器造不成壓力,那么你給我處理一下吧.
        await _serverSemaphore.WaitAsync();異步等待進(jìn)入信號(hào)量,如果沒(méi)有線程被授予對(duì)信號(hào)量的訪問(wèn)權(quán)限,則進(jìn)入執(zhí)行保護(hù)代碼;否則此線程將在此處等待,直到信號(hào)量被釋放為止

        Copy lock (_totalRequestsLock)
        {
        if (TotalRequests >= _requestQueueLimit + _maxConcurrentRequests)
        {
        return false;
        }
        TotalRequests++;
        }
        //異步等待進(jìn)入信號(hào)量,如果沒(méi)有線程被授予對(duì)信號(hào)量的訪問(wèn)權(quán)限,則進(jìn)入執(zhí)行保護(hù)代碼;否則此線程將在此處等待,直到信號(hào)量被釋放為止
        await _serverSemaphore.WaitAsync();
        return true;
        }

        返回成功后那么中間件這邊再進(jìn)行處理,_queuePolicy.OnExit();通過(guò)該調(diào)用進(jìn)行調(diào)用_serverSemaphore.Release();釋放信號(hào)燈,再對(duì)總請(qǐng)求數(shù)遞減

        Stack策略

        再來(lái)看看另一種方法,棧策略,他是怎么做的呢?一起來(lái)看看.再附加上如何使用的代碼.

        Copy public void ConfigureServices(IServiceCollection services)
        {
        services.AddStackPolicy(options =>
        {
        //最大并發(fā)請(qǐng)求數(shù)
        options.MaxConcurrentRequests = 2;
        //請(qǐng)求隊(duì)列長(zhǎng)度限制
        options.RequestQueueLimit = 1;
        });
        services.AddControllers();
        }

        通過(guò)上面的配置,我們便可以對(duì)我們的應(yīng)用程序執(zhí)行出相應(yīng)的策略.下面再來(lái)看看他是怎么實(shí)現(xiàn)的呢

        Copy public static IServiceCollection AddStackPolicy(this IServiceCollection services, Action configure) { services.Configure(configure); services.AddSingleton();
        return services;
        }

        可以看到這次是通過(guò)StackPolicy類做的策略.來(lái)一起來(lái)看看主要的方法

        Copy ///  /// 構(gòu)造方法(初始化參數(shù)) ///  ///  public StackPolicy(IOptions options) { //棧分配 _buffer = new List();
        //隊(duì)列大小
        _maxQueueCapacity = options.Value.RequestQueueLimit;
        //最大并發(fā)請(qǐng)求數(shù)
        _maxConcurrentRequests = options.Value.MaxConcurrentRequests;
        //剩余可用空間
        _freeServerSpots = options.Value.MaxConcurrentRequests;
        }

        當(dāng)我們通過(guò)中間件請(qǐng)求調(diào)用,_queuePolicy.TryEnterAsync()時(shí),首先會(huì)判斷我們是否還有訪問(wèn)請(qǐng)求次數(shù),如果_freeServerSpots>0,那么則直接給我們返回true,讓中間件直接去執(zhí)行下一步,如果當(dāng)前隊(duì)列=我們?cè)O(shè)置的隊(duì)列大小的話,那我們需要取消先前請(qǐng)求;每次取消都是先取消之前的保留后面的請(qǐng)求;

        Copy public ValueTask TryEnterAsync() { lock (_bufferLock) { if (_freeServerSpots > 0) { _freeServerSpots--; return _trueTask; } // 如果隊(duì)列滿了,取消先前的請(qǐng)求 if (_queueLength == _maxQueueCapacity) { _hasReachedCapacity = true; _buffer[_head].Complete(false); _queueLength--; } var tcs = _cachedResettableTCS ??= new ResettableBooleanCompletionSource(this); _cachedResettableTCS = null; if (_hasReachedCapacity || _queueLength < _buffer.Count)
        {
        _buffer[_head] = tcs;
        }
        else
        {
        _buffer.Add(tcs);
        }
        _queueLength++;
        // increment _head for next time
        _head++;
        if (_head == _maxQueueCapacity)
        {
        _head = 0;
        }
        return tcs.GetValueTask();
        }
        }

        當(dāng)我們請(qǐng)求后調(diào)用_queuePolicy.OnExit();出棧,再將請(qǐng)求長(zhǎng)度遞減

        Copy public void OnExit()
        {
        lock (_bufferLock)
        {
        if (_queueLength == 0)
        {
        _freeServerSpots++;

        if (_freeServerSpots > _maxConcurrentRequests)
        {
        _freeServerSpots--;
        throw new InvalidOperationException("OnExit must only be called once per successful call to TryEnterAsync");
        }

        return;
        }

        // step backwards and launch a new task
        if (_head == 0)
        {
        _head = _maxQueueCapacity - 1;
        }
        else
        {
        _head--;
        }
        //退出,出棧
        _buffer[_head].Complete(true);
        _queueLength--;
        }
        }

        總結(jié)

        基于棧結(jié)構(gòu)的特點(diǎn),在實(shí)際應(yīng)用中,通常只會(huì)對(duì)棧執(zhí)行以下兩種操作:

        • 向棧中添加元素,此過(guò)程被稱為"進(jìn)棧"(入棧或壓棧);
        • 從棧中提取出指定元素,此過(guò)程被稱為"出棧"(或彈棧);

        隊(duì)列存儲(chǔ)結(jié)構(gòu)的實(shí)現(xiàn)有以下兩種方式:

        • 順序隊(duì)列:在順序表的基礎(chǔ)上實(shí)現(xiàn)的隊(duì)列結(jié)構(gòu);
        • 鏈隊(duì)列:在鏈表的基礎(chǔ)上實(shí)現(xiàn)的隊(duì)列結(jié)構(gòu);

        推薦閱讀:云南之窗

        免責(zé)聲明:本文僅代表企業(yè)觀點(diǎn),與新疆信息港無(wú)關(guān)。其原創(chuàng)性以及文中陳述文字和內(nèi)容未經(jīng)本站證實(shí),對(duì)本文以及其中全部或者部分內(nèi)容、文字的真實(shí)性、完整性、及時(shí)性本站不作任何保證或承諾,請(qǐng)讀者僅作參考,并請(qǐng)自行核實(shí)相關(guān)內(nèi)容。
        熱門(mén)圖片
        熱門(mén)搜索