当前位置 : 主页 > 网络编程 > ASP >

Asp.Net Core7 preview4限流中间件新特性详解

来源:互联网 收集:自由互联 发布时间:2023-01-30
目录 前言 UseRateLimiter尝鲜 本地测试 ConcurrencyLimiter源码 获取令牌 尝试获取令牌核心逻辑 令牌获取失败后进入等待队列 归还令牌 总结 前言 限流是应对流量暴增或某些用户恶意攻击等场
目录
  • 前言
  • UseRateLimiter尝鲜
  • 本地测试
  • ConcurrencyLimiter源码
    • 获取令牌
    • 尝试获取令牌核心逻辑
    • 令牌获取失败后进入等待队列
    • 归还令牌
  • 总结

    前言

    限流是应对流量暴增或某些用户恶意攻击等场景的重要手段之一,然而微软官方从未支持这一重要特性,AspNetCoreRateLimit这一第三方库限流库一般作为首选使用,然而其配置参数过于繁多,对使用者造成较大的学习成本。令人高兴的是,在刚刚发布的.NET 7 Preview 4中开始支持限流中间件。

    UseRateLimiter尝鲜

    安装.NET 7.0 SDK(v7.0.100-preview.4)

    通过nuget包安装Microsoft.AspNetCore.RateLimiting

    创建.Net7网站应用,注册中间件

    全局限流并发1个

    app.UseRateLimiter(new RateLimiterOptions
    {
        Limiter = PartitionedRateLimiter.Create<HttpContext, string>(resource =>
        {
            return RateLimitPartition.CreateConcurrencyLimiter("MyLimiter",
                _ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
        })
    });
    

    根据不同资源不同限制并发数,/api前缀的资源租约数2,等待队列长度为2,其他默认租约数1,队列长度1。

    app.UseRateLimiter(new RateLimiterOptions()
    {
        // 触发限流的响应码
        DefaultRejectionStatusCode = 500,
        OnRejected = async (ctx, rateLimitLease) =>
        {
            // 触发限流回调处理
        },
        Limiter = PartitionedRateLimiter.Create<HttpContext, string>(resource =>
        {
            if (resource.Request.Path.StartsWithSegments("/api"))
            {
                return RateLimitPartition.CreateConcurrencyLimiter("WebApiLimiter",
                    _ => new ConcurrencyLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2));
            }
            else
            {
                return RateLimitPartition.CreateConcurrencyLimiter("DefaultLimiter",
                    _ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
            }
        })
    });
    

    本地测试

    新建一个webapi项目,并注册限流中间件如下

    using Microsoft.AspNetCore.RateLimiting;
    using System.Threading.RateLimiting;
    var builder = WebApplication.CreateBuilder(args);
    // Add services to the container.
    builder.Services.AddControllers();
    // Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
    builder.Services.AddEndpointsApiExplorer();
    builder.Services.AddSwaggerGen();
    var app = builder.Build();
    // Configure the HTTP request pipeline.
    if (app.Environment.IsDevelopment())
    {
        app.UseSwagger();
        app.UseSwaggerUI();
    }
    app.UseRateLimiter(new RateLimiterOptions
    {
        DefaultRejectionStatusCode = 500,
        OnRejected = async (ctx, lease) =>
        {
            await Task.FromResult(ctx.Response.WriteAsync("ConcurrencyLimiter"));
        },
        Limiter = PartitionedRateLimiter.Create<HttpContext, string>(resource =>
        {
            return RateLimitPartition.CreateConcurrencyLimiter("MyLimiter",
                _ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
        })
    });
    app.UseHttpsRedirection();
    app.UseAuthorization();
    app.MapControllers();
    app.Run();
    

    启动项目,使用jmeter测试100并发,请求接口/WeatherForecast

    所有请求处理成功,失败0!

    这个结果是不是有点失望,其实RateLimitPartition.CreateConcurrencyLimiter创建的限流器是
    ConcurrencyLimiter,后续可以实现个各种策略的限流器进行替换之。

    看了ConcurrencyLimiter的实现,其实就是令牌桶的限流思想,上面配置的new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1)),第一个1代表令牌的个数,第二个1代表可以当桶里的令牌为空时,进入等待队列,而不是直接失败,当前面的请求结束后,会归还令牌,此时等待的请求就可以拿到令牌了,QueueProcessingOrder.NewestFirst代表最新的请求优先获取令牌,也就是获取令牌时非公平的,还有另一个枚举值QueueProcessingOrder.OldestFirst老的优先,获取令牌是公平的。只要我们获取到令牌的人干活速度快,虽然我们令牌只有1,并发就很高。

    3. 测试触发失败场景

    只需要让我们拿到令牌的人持有时间长点,就能轻易的触发。

    调整jmater并发数为10

    相应内容也是我们设置的内容。

    ConcurrencyLimiter源码

    令牌桶限流思想

    获取令牌

            protected override RateLimitLease AcquireCore(int permitCount)
            {
                // These amounts of resources can never be acquired
                if (permitCount > _options.PermitLimit)
                {
                    throw new ArgumentOutOfRangeException(nameof(permitCount), permitCount, SR.Format(SR.PermitLimitExceeded, permitCount, _options.PermitLimit));
                }
                ThrowIfDisposed();
                // Return SuccessfulLease or FailedLease to indicate limiter state
                if (permitCount == 0)
                {
                    return _permitCount > 0 ? SuccessfulLease : FailedLease;
                }
                // Perf: Check SemaphoreSlim implementation instead of locking
                if (_permitCount >= permitCount)
                {
                    lock (Lock)
                    {
                        if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease))
                        {
                            return lease;
                        }
                    }
                }
                return FailedLease;
            }
    

    尝试获取令牌核心逻辑

            private bool TryLeaseUnsynchronized(int permitCount, [NotNullWhen(true)] out RateLimitLease? lease)
            {
                ThrowIfDisposed();
                // if permitCount is 0 we want to queue it if there are no available permits
                if (_permitCount >= permitCount && _permitCount != 0)
                {
                    if (permitCount == 0)
                    {
                        // Edge case where the check before the lock showed 0 available permits but when we got the lock some permits were now available
                        lease = SuccessfulLease;
                        return true;
                    }
                    // a. if there are no items queued we can lease
                    // b. if there are items queued but the processing order is newest first, then we can lease the incoming request since it is the newest
                    if (_queueCount == 0 || (_queueCount > 0 && _options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst))
                    {
                        _idleSince = null;
                        _permitCount -= permitCount;
                        Debug.Assert(_permitCount >= 0);
                        lease = new ConcurrencyLease(true, this, permitCount);
                        return true;
                    }
                }
                lease = null;
                return false;
            }
    

    令牌获取失败后进入等待队列

     protected override ValueTask<RateLimitLease> WaitAsyncCore(int permitCount, CancellationToken cancellationToken = default)
            {
                // These amounts of resources can never be acquired
                if (permitCount > _options.PermitLimit)
                {
                    throw new ArgumentOutOfRangeException(nameof(permitCount), permitCount, SR.Format(SR.PermitLimitExceeded, permitCount, _options.PermitLimit));
                }
                // Return SuccessfulLease if requestedCount is 0 and resources are available
                if (permitCount == 0 && _permitCount > 0 && !_disposed)
                {
                    return new ValueTask<RateLimitLease>(SuccessfulLease);
                }
                // Perf: Check SemaphoreSlim implementation instead of locking
                lock (Lock)
                {
                    if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease))
                    {
                        return new ValueTask<RateLimitLease>(lease);
                    }
                    // Avoid integer overflow by using subtraction instead of addition
                    Debug.Assert(_options.QueueLimit >= _queueCount);
                    if (_options.QueueLimit - _queueCount < permitCount)
                    {
                        if (_options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst && permitCount <= _options.QueueLimit)
                        {
                            // remove oldest items from queue until there is space for the newest request
                            do
                            {
                                RequestRegistration oldestRequest = _queue.DequeueHead();
                                _queueCount -= oldestRequest.Count;
                                Debug.Assert(_queueCount >= 0);
                                if (!oldestRequest.Tcs.TrySetResult(FailedLease))
                                {
                                    // Updating queue count is handled by the cancellation code
                                    _queueCount += oldestRequest.Count;
                                }
                            }
                            while (_options.QueueLimit - _queueCount < permitCount);
                        }
                        else
                        {
                            // Don't queue if queue limit reached and QueueProcessingOrder is OldestFirst
                            return new ValueTask<RateLimitLease>(QueueLimitLease);
                        }
                    }
                    CancelQueueState tcs = new CancelQueueState(permitCount, this, cancellationToken);
                    CancellationTokenRegistration ctr = default;
                    if (cancellationToken.CanBeCanceled)
                    {
                        ctr = cancellationToken.Register(static obj =>
                        {
                            ((CancelQueueState)obj!).TrySetCanceled();
                        }, tcs);
                    }
                    RequestRegistration request = new RequestRegistration(permitCount, tcs, ctr);
                    _queue.EnqueueTail(request);
                    _queueCount += permitCount;
                    Debug.Assert(_queueCount <= _options.QueueLimit);
                    return new ValueTask<RateLimitLease>(request.Tcs.Task);
                }
            }
    

    归还令牌

     private void Release(int releaseCount)
            {
                lock (Lock)
                {
                    if (_disposed)
                    {
                        return;
                    }
                    _permitCount += releaseCount;
                    Debug.Assert(_permitCount <= _options.PermitLimit);
                    while (_queue.Count > 0)
                    {
                        RequestRegistration nextPendingRequest =
                            _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
                            ? _queue.PeekHead()
                            : _queue.PeekTail();
                        if (_permitCount >= nextPendingRequest.Count)
                        {
                            nextPendingRequest =
                                _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
                                ? _queue.DequeueHead()
                                : _queue.DequeueTail();
                            _permitCount -= nextPendingRequest.Count;
                            _queueCount -= nextPendingRequest.Count;
                            Debug.Assert(_permitCount >= 0);
                            ConcurrencyLease lease = nextPendingRequest.Count == 0 ? SuccessfulLease : new ConcurrencyLease(true, this, nextPendingRequest.Count);
                            // Check if request was canceled
                            if (!nextPendingRequest.Tcs.TrySetResult(lease))
                            {
                                // Queued item was canceled so add count back
                                _permitCount += nextPendingRequest.Count;
                                // Updating queue count is handled by the cancellation code
                                _queueCount += nextPendingRequest.Count;
                            }
                            nextPendingRequest.CancellationTokenRegistration.Dispose();
                            Debug.Assert(_queueCount >= 0);
                        }
                        else
                        {
                            break;
                        }
                    }
                    if (_permitCount == _options.PermitLimit)
                    {
                        Debug.Assert(_idleSince is null);
                        Debug.Assert(_queueCount == 0);
                        _idleSince = Stopwatch.GetTimestamp();
                    }
                }
            }
    

    总结

    虽然这次官方对限流进行了支持,但貌似还不能支持对ip或client级别的限制支持,对于更高级的限流策略仍需要借助第三方库或自己实现,期待后续越来越完善,更多关于Asp.Net Core7 preview限流中间件的资料请关注自由互联其它相关文章!

    上一篇:asp.net core实体类生产CRUD后台管理界面
    下一篇:没有了
    网友评论