• 八方資訊網歡迎您!
    八方資訊網>美食>正文

    ASP.NET Core 3.x 并發限制

    2020-03-30 12:44:17 來源: 閱讀:

    前言

    Microsoft.AspNetCore.ConcurrencyLimiter AspNetCore3.0后增加的,用于傳入的請求進行排隊處理,避免線程池的不足.
    我們日常開發中可能常做的給某web服務器配置連接數以及,請求隊列大小,那么今天我們看看如何在通過中間件形式實現一個并發量以及隊列長度限制.

    Queue策略

    添加Nuget

    Install-Package Microsoft.AspNetCore.ConcurrencyLimiter

    Copy public void ConfigureServices(IServiceCollection services)
    {
    services.AddQueuePolicy(options =>
    {
    //最大并發請求數
    options.MaxConcurrentRequests = 2;
    //請求隊列長度限制
    options.RequestQueueLimit = 1;
    });
    services.AddControllers();
    }
    public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
    {
    //添加并發限制中間件
    app.UseConcurrencyLimiter();
    app.Run(async context =>
    {
    Task.Delay(100).Wait(); // 100ms sync-over-async

    await context.Response.WriteAsync("Hello World!");
    });
    if (env.IsDevelopment())
    {
    app.UseDeveloperExceptionPage();
    }

    app.UseHttpsRedirection();

    app.UseRouting();

    app.UseAuthorization();

    app.UseEndpoints(endpoints =>
    {
    endpoints.MapControllers();
    });
    }

    通過上面簡單的配置,我們就可以將他引入到我們的代碼中,從而做并發量限制,以及隊列的長度;那么問題來了,他是怎么實現的呢?

    Copy public static IServiceCollection AddQueuePolicy(this IServiceCollection services, Action configure){ services.Configure(configure); services.AddSingleton();
    return services;
    }

    QueuePolicy采用的是SemaphoreSlim信號量設計,SemaphoreSlim、Semaphore(信號量)支持并發多線程進入被保護代碼,對象在初始化時會指定 最大任務數量,當線程請求訪問資源,信號量遞減,而當他們釋放時,信號量計數又遞增。

    Copy ///  /// 構造方法(初始化Queue策略) ///  ///  public QueuePolicy(IOptions options) { _maxConcurrentRequests = options.Value.MaxConcurrentRequests; if (_maxConcurrentRequests <= 0) { throw new ArgumentException(nameof(_maxConcurrentRequests), "MaxConcurrentRequests must be a positive integer."); } _requestQueueLimit = options.Value.RequestQueueLimit; if (_requestQueueLimit < 0)
    {
    throw new ArgumentException(nameof(_requestQueueLimit), "The RequestQueueLimit cannot be a negative number.");
    }
    //使用SemaphoreSlim來限制任務最大個數
    _serverSemaphore = new SemaphoreSlim(_maxConcurrentRequests);
    }

    ConcurrencyLimiterMiddleware中間件

    Copy ///  /// Invokes the logic of the middleware. ///  /// The . /// A  that completes when the request leaves.
    public async Task Invoke(HttpContext context)
    {
    var waitInQueueTask = _queuePolicy.TryEnterAsync();

    // Make sure we only ever call GetResult once on the TryEnterAsync ValueTask b/c it resets.
    bool result;

    if (waitInQueueTask.IsCompleted)
    {
    ConcurrencyLimiterEventSource.Log.QueueSkipped();
    result = waitInQueueTask.Result;
    }
    else
    {
    using (ConcurrencyLimiterEventSource.Log.QueueTimer())
    {
    result = await waitInQueueTask;
    }
    }

    if (result)
    {
    try
    {
    await _next(context);
    }
    finally
    {
    _queuePolicy.OnExit();
    }
    }
    else
    {
    ConcurrencyLimiterEventSource.Log.RequestRejected();
    ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
    context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
    await _onRejected(context);
    }
    }

    每次當我們請求的時候首先會調用_queuePolicy.TryEnterAsync(),進入該方法后先開啟一個私有lock鎖,再接著判斷總請求量是否≥(請求隊列限制的大小+最大并發請求數),如果當前數量超出了,那么我直接拋出,送你個503狀態;

    Copy if (result)
    {
    try
    {
    await _next(context);
    }
    finally
    {
    _queuePolicy.OnExit();
    }
    }
    else
    {
    ConcurrencyLimiterEventSource.Log.RequestRejected();
    ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
    context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
    await _onRejected(context);
    }

    問題來了,我這邊如果說還沒到你設置的大小呢,我這個請求沒有給你服務器造不成壓力,那么你給我處理一下吧.
    await _serverSemaphore.WaitAsync();異步等待進入信號量,如果沒有線程被授予對信號量的訪問權限,則進入執行保護代碼;否則此線程將在此處等待,直到信號量被釋放為止

    Copy lock (_totalRequestsLock)
    {
    if (TotalRequests >= _requestQueueLimit + _maxConcurrentRequests)
    {
    return false;
    }
    TotalRequests++;
    }
    //異步等待進入信號量,如果沒有線程被授予對信號量的訪問權限,則進入執行保護代碼;否則此線程將在此處等待,直到信號量被釋放為止
    await _serverSemaphore.WaitAsync();
    return true;
    }

    返回成功后那么中間件這邊再進行處理,_queuePolicy.OnExit();通過該調用進行調用_serverSemaphore.Release();釋放信號燈,再對總請求數遞減

    Stack策略

    再來看看另一種方法,棧策略,他是怎么做的呢?一起來看看.再附加上如何使用的代碼.

    Copy public void ConfigureServices(IServiceCollection services)
    {
    services.AddStackPolicy(options =>
    {
    //最大并發請求數
    options.MaxConcurrentRequests = 2;
    //請求隊列長度限制
    options.RequestQueueLimit = 1;
    });
    services.AddControllers();
    }

    通過上面的配置,我們便可以對我們的應用程序執行出相應的策略.下面再來看看他是怎么實現的呢

    Copy public static IServiceCollection AddStackPolicy(this IServiceCollection services, Action configure) { services.Configure(configure); services.AddSingleton();
    return services;
    }

    可以看到這次是通過StackPolicy類做的策略.來一起來看看主要的方法

    Copy ///  /// 構造方法(初始化參數) ///  ///  public StackPolicy(IOptions options) { //棧分配 _buffer = new List();
    //隊列大小
    _maxQueueCapacity = options.Value.RequestQueueLimit;
    //最大并發請求數
    _maxConcurrentRequests = options.Value.MaxConcurrentRequests;
    //剩余可用空間
    _freeServerSpots = options.Value.MaxConcurrentRequests;
    }

    當我們通過中間件請求調用,_queuePolicy.TryEnterAsync()時,首先會判斷我們是否還有訪問請求次數,如果_freeServerSpots>0,那么則直接給我們返回true,讓中間件直接去執行下一步,如果當前隊列=我們設置的隊列大小的話,那我們需要取消先前請求;每次取消都是先取消之前的保留后面的請求;

    Copy public ValueTask TryEnterAsync() { lock (_bufferLock) { if (_freeServerSpots > 0) { _freeServerSpots--; return _trueTask; } // 如果隊列滿了,取消先前的請求 if (_queueLength == _maxQueueCapacity) { _hasReachedCapacity = true; _buffer[_head].Complete(false); _queueLength--; } var tcs = _cachedResettableTCS ??= new ResettableBooleanCompletionSource(this); _cachedResettableTCS = null; if (_hasReachedCapacity || _queueLength < _buffer.Count)
    {
    _buffer[_head] = tcs;
    }
    else
    {
    _buffer.Add(tcs);
    }
    _queueLength++;
    // increment _head for next time
    _head++;
    if (_head == _maxQueueCapacity)
    {
    _head = 0;
    }
    return tcs.GetValueTask();
    }
    }

    當我們請求后調用_queuePolicy.OnExit();出棧,再將請求長度遞減

    Copy public void OnExit()
    {
    lock (_bufferLock)
    {
    if (_queueLength == 0)
    {
    _freeServerSpots++;

    if (_freeServerSpots > _maxConcurrentRequests)
    {
    _freeServerSpots--;
    throw new InvalidOperationException("OnExit must only be called once per successful call to TryEnterAsync");
    }

    return;
    }

    // step backwards and launch a new task
    if (_head == 0)
    {
    _head = _maxQueueCapacity - 1;
    }
    else
    {
    _head--;
    }
    //退出,出棧
    _buffer[_head].Complete(true);
    _queueLength--;
    }
    }

    總結

    基于棧結構的特點,在實際應用中,通常只會對棧執行以下兩種操作:

    • 向棧中添加元素,此過程被稱為"進棧"(入棧或壓棧);
    • 從棧中提取出指定元素,此過程被稱為"出棧"(或彈棧);

    隊列存儲結構的實現有以下兩種方式:

    • 順序隊列:在順序表的基礎上實現的隊列結構;
    • 鏈隊列:在鏈表的基礎上實現的隊列結構;

    本文為企業推廣,本網站不做任何建議,僅提供參考,作為信息展示!

    推薦閱讀:手機充電寶品牌排行

    網友評論
    請登錄后進行評論| 0條評論

    請文明發言,還可以輸入140

    您的評論已經發表成功,請等候審核

    小提示:您要為您發表的言論后果負責,請各位遵守法紀注意語言文明

    回到首頁 回到頂部
    八方資訊網 關于我們| 聯系我們| 招聘信息| 老版地圖| 網站地圖
    免責聲明:八方資訊網所有文字、圖片、視頻、音頻等資料均來自互聯網,不代表本站贊同其觀點,本站亦不為其版權負責。相關作品的原創性、文中陳述文字以及內容數據龐雜本站無法一一核實,如果您發現本網站上有侵犯您的合法權益的內容,請聯系我們,本網站將立即予以刪除!
    Copyright © 2012-2019 http://www.quan28.cn, All rights reserved.
    主站蜘蛛池模板: 国产在线不卡午夜精品2021| 亚洲国产精品国自产电影| 精品人妻久久久久久888| 国内精品久久国产大陆| 亚洲av无码国产精品色在线看不卡| 中文字幕成人精品久久不卡| 香蕉国产精品频视| 亚洲精品无码AV中文字幕电影网站| 久久99热狠狠色精品一区| 久久久久久亚洲精品无码| 久久丫精品国产亚洲av| 亚洲国产精品久久久久久| 精品久人妻去按摩店被黑人按中出| 亚洲av永久无码精品国产精品| 亚洲精品欧美日韩| 国产一区二区精品尤物| 熟妇人妻VA精品中文字幕| 亚洲视频精品在线| 久久久99精品一区二区| 自怕偷自怕亚洲精品| 亚洲AV无码成人精品区蜜桃| 91不卡在线精品国产| 69SEX久久精品国产麻豆| 国产乱人伦偷精品视频不卡| 国产成人精品日本亚洲| 久久久精品久久久久久| 国产精品久久久天天影视香蕉| 久久精品中文字幕无码绿巨人| 国产精品永久免费视频| 国产精品国色综合久久| 亚洲电影日韩精品| 99热成人精品免费久久| 国产精品多人p群无码| 四虎国产精品永久地址入口| 91麻豆精品一二三区在线| 国产午夜精品久久久久免费视 | 国产精品国产亚洲精品看不卡| 亚洲日韩一页精品发布| 国产一区二区精品久久岳| 精品久久久久中文字幕一区| 青青青青久久精品国产|