ASP.NET Core 消息队列----Redis队列

- Redis

一、安装InitQ 及 StackExchange.Redis

d1.png

二、Redis 消息队列 启动服务

 public static class RedisMqSetup
 {
     public static void AddRedisInitMqSetup(this IServiceCollection services)
     {
         if (services == null) throw new ArgumentNullException(nameof(services));
         if (true) //开关可自定义
         {
             services.AddInitQ(m =>
             {
                 //时间间隔
                 m.SuspendTime = 2000;
                 //redis服务器地址
                 m.ConnectionString = AppSettings.app(new string[] { "Redis", "ConnectionString" });
                 //对应的订阅者类,需要new一个实例对象,当然你也可以传参,比如日志对象
                 m.ListSubscribe = new List<Type>() {
                     typeof(RedisSubscribeA),
                     typeof(RedisSubscribeB)
                 };
                 //显示日志
                 m.ShowLog = false;
             });
             services.AddScoped(typeof(IRedisBasketRepository), typeof(RedisBasketRepository));
         }
     }
     
 }

三、创建订阅类

 public class RedisSubscribeA : IRedisSubscribe
 {
     [Subscribe("RedisTest")]
     private async Task SubRedisTest(string msg)
     {
         Console.WriteLine($"A类--->订阅者A1消息消息:{msg}");
         await Task.CompletedTask;
     }

     [Subscribe("RedisTest")]
     private async Task SubRedisTest1(string msg)
     {
         Console.WriteLine($"A类--->订阅者A2消息消息:{msg}");
         await Task.CompletedTask;
     }
 }
    public class RedisSubscribeB : IRedisSubscribe
    {
       
        [Subscribe("RedisTest")]
        private async Task SubRedisTest(string msg)
        {
            Console.WriteLine($"B类--->订阅者B消费消息:{msg}");
        }
    }

四、调用注册

builder.Services.AddRedisMqSetup();


五、创建测试接口

[HttpGet]
[AllowAnonymous]
public async Task RedisMq([FromServices] IRedisBasketRepository _redisBasketRepository)
{
   var msg = $"这里是一条日志{DateTime.Now}";
 for (int i = 0; i < 20; i++)
 {
     await _redisBasketRepository.ListLeftPushAsync("RedisTest", msg+"------"+i, 1);
 }
}

六、创建 IRedisBasketRepository,RedisBasketRepository

public interface IRedisBasketRepository
{

    //获取 Reids 缓存值
    Task<string> GetValue(string key);
    //获取值,并序列化
    Task<TEntity> Get<TEntity>(string key);
    //保存
    Task Set(string key, object value, TimeSpan cacheTime);
    //判断是否存在
    Task<bool> Exist(string key);
    //移除某一个缓存值
    Task Remove(string key);
    //全部清除
    Task Clear();
    Task<RedisValue[]> ListRangeAsync(string redisKey);
    Task<long> ListLeftPushAsync(string redisKey, string redisValue, int db = -1);
    Task<long> ListRightPushAsync(string redisKey, string redisValue, int db = -1);
    Task<long> ListRightPushAsync(string redisKey, IEnumerable<string> redisValue, int db = -1);
    Task<T> ListLeftPopAsync<T>(string redisKey, int db = -1) where T : class;
    Task<T> ListRightPopAsync<T>(string redisKey, int db = -1) where T : class;
    Task<string> ListLeftPopAsync(string redisKey, int db = -1);
    Task<string> ListRightPopAsync(string redisKey, int db = -1);
    Task<long> ListLengthAsync(string redisKey, int db = -1);
    Task<IEnumerable<string>> ListRangeAsync(string redisKey, int db = -1);
    Task<IEnumerable<string>> ListRangeAsync(string redisKey, int start, int stop, int db = -1);
    Task<long> ListDelRangeAsync(string redisKey, string redisValue, long type = 0, int db = -1);
    Task ListClearAsync(string redisKey, int db = -1);

}
public class RedisBasketRepository : IRedisBasketRepository
{
    private readonly ILogger<RedisBasketRepository> _logger;
    private readonly ConnectionMultiplexer _redis;
    private readonly IDatabase _database;

    public RedisBasketRepository(ILogger<RedisBasketRepository> logger, ConnectionMultiplexer redis)
    {
        _logger = logger;
        _redis = redis;
        _database = redis.GetDatabase();
    }

    private IServer GetServer()
    {
        var endpoint = _redis.GetEndPoints();
        return _redis.GetServer(endpoint.First());
    }

    public async Task Clear()
    {
        foreach (var endPoint in _redis.GetEndPoints())
        {
            var server = GetServer();
            foreach (var key in server.Keys())
            {
                await _database.KeyDeleteAsync(key);
            }
        }
    }
    public async Task<bool> Exist(string key)
    {
        return await _database.KeyExistsAsync(key);
    }

    public async Task<string> GetValue(string key)
    {
        return await _database.StringGetAsync(key);
    }
    public async Task Remove(string key)
    {
        await _database.KeyDeleteAsync(key);
    }
    public async Task Set(string key, object value, TimeSpan cacheTime)
    {
        if (value != null)
        {
            if (value is string cacheValue)
            {
                // 字符串无需序列化
                await _database.StringSetAsync(key, cacheValue, cacheTime);
            }
            else
            {
                //序列化,将object值生成RedisValue
                await _database.StringSetAsync(key, SerializeHelper.Serialize(value), cacheTime);
            }
        }
    }
    public async Task<TEntity> Get<TEntity>(string key)
    {
        var value = await _database.StringGetAsync(key);
        if (value.HasValue)
        {
            //需要用的反序列化,将Redis存储的Byte[],进行反序列化
            return SerializeHelper.Deserialize<TEntity>(value);
        }
        else
        {
            return default(TEntity);
        }
    }

    /// <summary>
    /// 根据key获取RedisValue
    /// </summary>
    /// <typeparam name="T"></typeparam>
    /// <param name="redisKey"></param>
    /// <returns></returns>
    public async Task<RedisValue[]> ListRangeAsync(string redisKey)
    {
        return await _database.ListRangeAsync(redisKey);
    }

    /// <summary>
    /// 在列表头部插入值。如果键不存在,先创建再插入值
    /// </summary>
    /// <param name="redisKey"></param>
    /// <param name="redisValue"></param>
    /// <returns></returns>
    public async Task<long> ListLeftPushAsync(string redisKey, string redisValue, int db = -1)
    {
        return await _database.ListLeftPushAsync(redisKey, redisValue);
    }
    /// <summary>
    /// 在列表尾部插入值。如果键不存在,先创建再插入值
    /// </summary>
    /// <param name="redisKey"></param>
    /// <param name="redisValue"></param>
    /// <returns></returns>
    public async Task<long> ListRightPushAsync(string redisKey, string redisValue, int db = -1)
    {
        return await _database.ListRightPushAsync(redisKey, redisValue);
    }

    /// <summary>
    /// 在列表尾部插入数组集合。如果键不存在,先创建再插入值
    /// </summary>
    /// <param name="redisKey"></param>
    /// <param name="redisValue"></param>
    /// <returns></returns>
    public async Task<long> ListRightPushAsync(string redisKey, IEnumerable<string> redisValue, int db = -1)
    {
        var redislist = new List<RedisValue>();
        foreach (var item in redisValue)
        {
            redislist.Add(item);
        }
        return await _database.ListRightPushAsync(redisKey, redislist.ToArray());
    }

    /// <summary>
    /// 移除并返回存储在该键列表的第一个元素  反序列化
    /// </summary>
    /// <param name="redisKey"></param>
    /// <returns></returns>
    public async Task<T> ListLeftPopAsync<T>(string redisKey, int db = -1) where T : class
    {
        return JsonConvert.DeserializeObject<T>(await _database.ListLeftPopAsync(redisKey));
    }
    /// <summary>
    /// 移除并返回存储在该键列表的最后一个元素   反序列化
    /// 只能是对象集合
    /// </summary>
    /// <param name="redisKey"></param>
    /// <returns></returns>
    public async Task<T> ListRightPopAsync<T>(string redisKey, int db = -1) where T : class
    {
        return JsonConvert.DeserializeObject<T>(await _database.ListRightPopAsync(redisKey));
    }

    /// <summary>
    /// 移除并返回存储在该键列表的第一个元素   
    /// </summary>
    /// <param name="redisKey"></param>
    /// <param name="db"></param>
    /// <returns></returns>
    public async Task<string> ListLeftPopAsync(string redisKey, int db = -1)
    {
        return await _database.ListLeftPopAsync(redisKey);
    }

    /// <summary>
    /// 移除并返回存储在该键列表的最后一个元素   
    /// </summary>
    /// <typeparam name="T"></typeparam>
    /// <param name="redisKey"></param>
    /// <param name="db"></param>
    /// <returns></returns>
    public async Task<string> ListRightPopAsync(string redisKey, int db = -1)
    {
        return await _database.ListRightPopAsync(redisKey);
    }

    /// <summary>
    /// 列表长度
    /// </summary>
    /// <param name="redisKey"></param>
    /// <param name="db"></param>
    /// <returns></returns>
    public async Task<long> ListLengthAsync(string redisKey, int db = -1)
    {
        return await _database.ListLengthAsync(redisKey);
    }

    /// <summary>
    /// 返回在该列表上键所对应的元素
    /// </summary>
    /// <param name="redisKey"></param>
    /// <returns></returns>
    public async Task<IEnumerable<string>> ListRangeAsync(string redisKey, int db = -1)
    {
        var result = await _database.ListRangeAsync(redisKey);
        return result.Select(o => o.ToString());
    }

    /// <summary>
    /// 根据索引获取指定位置数据
    /// </summary>
    /// <param name="redisKey"></param>
    /// <param name="start"></param>
    /// <param name="stop"></param>
    /// <param name="db"></param>
    /// <returns></returns>
    public async Task<IEnumerable<string>> ListRangeAsync(string redisKey, int start, int stop, int db = -1)
    {
        var result = await _database.ListRangeAsync(redisKey, start, stop);
        return result.Select(o => o.ToString());
    }

    /// <summary>
    /// 删除List中的元素 并返回删除的个数
    /// </summary>
    /// <param name="redisKey">key</param>
    /// <param name="redisValue">元素</param>
    /// <param name="type">大于零 : 从表头开始向表尾搜索,小于零 : 从表尾开始向表头搜索,等于零:移除表中所有与 VALUE 相等的值</param>
    /// <param name="db"></param>
    /// <returns></returns>
    public async Task<long> ListDelRangeAsync(string redisKey, string redisValue, long type = 0, int db = -1)
    {
        return await _database.ListRemoveAsync(redisKey, redisValue, type);
    }

    /// <summary>
    /// 清空List
    /// </summary>
    /// <param name="redisKey"></param>
    /// <param name="db"></param>
    public async Task ListClearAsync(string redisKey, int db = -1)
    {
        await _database.ListTrimAsync(redisKey, 1, 0);
    }
}

调用注册

builder.Services.AddRedisInitMqSetup();

测试发布消息



参考:https://www.cnblogs.com/tibos/p/14944832.html

          https://www.cnblogs.com/tibos/p/11858095.html

相关文章!
  • ASP.NET Core 消息队列---

    Dapr (Distributed Application Runtime)是一个可移植的、事件驱动的运行时,它使任何开发人员都可以轻松地构建运行在云和边缘上的弹性

  • ASP.NET Core 消息队列---

  • ASP.NET Core 消息队列---

    RabbitMQ从信息接收者角度可以看做三种模式,一对一,一对多(此一对多并不是发布订阅,而是每条信息只有一个接收者)和发布订阅。其中一对