C# .Net Core实现RedisClient
作者:csframework|C/S框架网  发布日期:2021/12/30 13:18:40
  C# .Net Core实现RedisClient


C# .Net Core实现RedisClient


引言

  最近工作上有需要使用redis,于是便心血来潮打算自己写一个C#客户端。经过几天的努力,目前该客户端已经基本成型,下面简单介绍一下。

通信协议

  要想自行实现redisClient,则必须先要了解Redis的socket能信协议。新版统一请求协议在 Redis 1.2 版本中引入, 并最终在 Redis 2.0 版本成为 Redis 服务器通信的标准方式。在这个协议中, 所有发送至 Redis 服务器的参数都是二进制安全(binary safe)的。

  以下是这个协议的一般形式:

*<参数数量> CR LF
$<参数 1 的字节数量> CR LF
<参数 1 的数据> CR LF
...
$<参数 N 的字节数量> CR LF
<参数 N 的数据> CR LF

  注:命令本身也作为协议的其中一个参数来发送。举个例子, 以下是一个命令协议的打印版本:

 *3
 $3
 SET
 $5
 mykey
 $7
 myvalue

  这个命令的实际协议值如下:

 "*3\r\n$3\r\nSET\r\n$5\r\nmykey\r\n$7\r\nmyvalue\r\n"

  稍后看到, 这种格式除了用作命令请求协议之外, 也用在命令的回复协议中: 这种只有一个参数的回复格式被称为批量回复(Bulk Reply)。统一协议请求原本是用在回复协议中, 用于将列表的多个项返回给客户端的, 这种回复格式被称为多条批量回复(Multi Bulk Reply)。一个多条批量回复以*<argc>\r\n为前缀, 后跟多条不同的批量回复, 其中argc为这些批量回复的数量。

  Redis 命令会返回多种不同类型的回复。一个状态回复(或者单行回复,single line reply)是一段以"+"开始、"\r\n"结尾的单行字符串。通过检查服务器发回数据的第一个字节, 可以确定这个回复是什么类型:

  • 状态回复(status reply)的第一个字节是"+"
  • 错误回复(error reply)的第一个字节是"-"
  • 整数回复(integer reply)的第一个字节是":"
  • 批量回复(bulk reply)的第一个字节是"$"
  • 多条批量回复(multi bulk reply)的第一个字节是"*"

.net Core Socket

  说起socket,就不得不说IOCP了,这个方案本身就是为了解决多连接、高并发而设计的;但是话又说回来,任何方案都有局限性,不可能解决所有问题;这里不去讨论用在这里是否合适,反正本人就是想这么试一把:用一个简单的ioc模式实现SAEA.Socket,并为此设定各种场景,反过来优化SAEA.Socket本身。下面是一段服务器接收连接的代码:

         private void ProcessAccept(SocketAsyncEventArgs args)
         {
             if (args == null)
             {
                 args = new SocketAsyncEventArgs();
                 args.Completed += ProcessAccepted;
             }
             else
             {
                 args.AcceptSocket = null;
             }
             if (!_listener.AcceptAsync(args))
             {
                 ProcessAccepted(_listener, args);
             }
         }

项目结构

  在网上找到redis的命令文档后,本人觉的准备工作差不多了,可以初步定一下项目结构:

  Core:定义的是redisclient相关最基本的业务

  Interface:定义的是一些需要抽象出来的接口

  Model:定义的是redis的数据模型及其请求、回复的类型枚举

  Net:这里就是将继承实现SAEA.Socket而来的RedisConnection通信基础

命令解码器

  通过前面的准备工作了解到redisClient的关键在于命令的编解码,至于高大上算法或redis官方算法的实现,本人没有去详细了解,一冲动就自行实现了自定义版的解码器。

         public string Coder(RequestType commandName, params string[] @params)
         {
             _autoResetEvent.WaitOne();
             _commandName = commandName;
             var sb = new StringBuilder();
             sb.AppendLine("*" + @params.Length);
             foreach (var param in @params)
             {
                 sb.AppendLine("$" + param.Length);
                 sb.AppendLine(param);
             }
             return sb.ToString();
         }
        public ResponseData Decoder()
         {
             var result = new ResponseData();
 
             string command = null;
 
             string error = null;
 
             var len = 0;
 
             switch (_commandName)
             {
                 case RequestType.PING:
                     command = BlockDequeue();
                     if (GetStatus(command, out error))
                     {
                         result.Type = ResponseType.OK;
                         result.Data = "PONG";
                     }
                     else
                     {
                         result.Type = ResponseType.Error;
                         result.Data = error;
                     }
                     break;
                 case RequestType.AUTH:
                 case RequestType.SELECT:
                 case RequestType.SLAVEOF:
                 case RequestType.SET:
                 case RequestType.DEL:
                 case RequestType.HSET:
                 case RequestType.HDEL:
                 case RequestType.LSET:
                     command = BlockDequeue();
                     if (GetStatus(command, out error))
                     {
                         result.Type = ResponseType.OK;
                         result.Data = "OK";
                     }
                     else
                     {
                         result.Type = ResponseType.Error;
                         result.Data = error;
                     }
                     break;
                 case RequestType.TYPE:
                     command = BlockDequeue();
                     if (GetStatusString(command, out string msg))
                     {
                         result.Type = ResponseType.OK;
                     }
                     else
                     {
                         result.Type = ResponseType.Error;
                     }
                     result.Data = msg;
                     break;
                 case RequestType.GET:
                 case RequestType.GETSET:
                 case RequestType.HGET:
                 case RequestType.LPOP:
                 case RequestType.RPOP:
                 case RequestType.SRANDMEMBER:
                 case RequestType.SPOP:
                     len = GetWordsNum(BlockDequeue(), out error);
                     if (len == -1)
                     {
                         result.Type = ResponseType.Empty;
                         result.Data = error;
                     }
                     else
                     {
                         result.Type = ResponseType.String;
                         result.Data += BlockDequeue();
                     }
                     break;
                 case RequestType.KEYS:
                 case RequestType.HKEYS:
                 case RequestType.LRANGE:
                 case RequestType.SMEMBERS:
                     result.Type = ResponseType.Lines;
                     var sb = new StringBuilder();
                     var rn = GetRowNum(BlockDequeue(), out error);
                     if (!string.IsNullOrEmpty(error))
                     {
                         result.Type = ResponseType.Error;
                         result.Data = error;
                         break;
                     }
                     //再尝试读取一次,发现有回车行出现
                     if (rn == -1) rn = GetRowNum(BlockDequeue(), out error);
                     if (!string.IsNullOrEmpty(error))
                     {
                         result.Type = ResponseType.Error;
                         result.Data = error;
                         break;
                     }
                     if (rn > 0)
                     {
                         for (int i = 0; i < rn; i++)
                         {
                             len = GetWordsNum(BlockDequeue(), out error);
                             sb.AppendLine(BlockDequeue());
                         }
                     }
                     result.Data = sb.ToString();
                     break;
                 case RequestType.HGETALL:
                 case RequestType.ZRANGE:
                 case RequestType.ZREVRANGE:
                     result.Type = ResponseType.KeyValues;
                     sb = new StringBuilder();
                     rn = GetRowNum(BlockDequeue(), out error);
                     if (!string.IsNullOrEmpty(error))
                     {
                         result.Type = ResponseType.Error;
                         result.Data = error;
                         break;
                     }
                     if (rn > 0)
                     {
                         for (int i = 0; i < rn; i++)
                         {
                             len = GetWordsNum(BlockDequeue(), out error);
                             sb.AppendLine(BlockDequeue());
                         }
                     }
                     result.Data = sb.ToString();
                     break;
                 case RequestType.DBSIZE:
                 case RequestType.EXISTS:
                 case RequestType.EXPIRE:
                 case RequestType.PERSIST:
                 case RequestType.SETNX:
                 case RequestType.HEXISTS:
                 case RequestType.HLEN:
                 case RequestType.LLEN:
                 case RequestType.LPUSH:
                 case RequestType.RPUSH:
                 case RequestType.LREM:
                 case RequestType.SADD:
                 case RequestType.SCARD:
                 case RequestType.SISMEMBER:
                 case RequestType.SREM:
                 case RequestType.ZADD:
                 case RequestType.ZCARD:
                 case RequestType.ZCOUNT:
                 case RequestType.ZREM:
                 case RequestType.PUBLISH:
                     var val = GetValue(BlockDequeue(), out error);
                     if (!string.IsNullOrEmpty(error))
                     {
                         result.Type = ResponseType.Error;
                         result.Data = error;
                         break;
                     }
                     if (val == 0)
                     {
                         result.Type = ResponseType.Empty;
                     }
                     else
                     {
                         result.Type = ResponseType.OK;
                     }
                     result.Data = val.ToString();
                     break;
                 case RequestType.INFO:
                     var rnum = GetWordsNum(BlockDequeue(), out error);
                     if (!string.IsNullOrEmpty(error))
                     {
                         result.Type = ResponseType.Error;
                         result.Data = error;
                         break;
                     }
                     var info = "";
                     while (info.Length < rnum)
                     {
                         info += BlockDequeue();
                     }
                     result.Type = ResponseType.String;
                     result.Data = info;
                     break;
                 case RequestType.SUBSCRIBE:
                     var r = "";
                     while (IsSubed)
                     {
                         r = BlockDequeue();
                         if (r == "message\r\n")
                         {
                             result.Type = ResponseType.Sub;
                             BlockDequeue();
                             result.Data = BlockDequeue();
                             BlockDequeue();
                             result.Data += BlockDequeue();
                             break;
                         }
                     }
                     break;
                 case RequestType.UNSUBSCRIBE:
                     var rNum = GetRowNum(BlockDequeue(), out error);
                     var wNum = GetWordsNum(BlockDequeue(), out error);
                     BlockDequeue();
                     wNum = GetWordsNum(BlockDequeue(), out error);
                     var channel = BlockDequeue();
                     var vNum = GetValue(BlockDequeue(), out error);
                     IsSubed = false;
                     break;
             }
             _autoResetEvent.Set();
             return result;
         }

命令的封装与测试

  有了socket、redisCoder之后,现在就可以按照官方的redis命令来进行.net core的封装了。本人将这些操作封装到RedisClient、RedisDataBase两个类中,然后又想到连接复用的问题,简单实现了一个连接池RedisClientFactory的类。这样一来就可以好好的来实验一把,看看之前的设想最终能不能实现了:

 /****************************************************************************
 *Copyright (c) 2018 Microsoft All Rights Reserved.
 *CLR版本: 4.0.30319.42000
 *机器名称:WENLI-PC
 *公司名称:Microsoft
 *命名空间:SAEA.RedisSocketTest
 *文件名: Program
 *版本号: V1.0.0.0
 *唯一标识:3d4f939c-3fb9-40e9-a0e0-c7ec773539ae
 *当前的用户域:WENLI-PC
 *创建人: yswenli
 *电子邮箱:wenguoli_520@qq.com
 *创建时间:2018/3/17 10:37:15
 *描述:
 *
 *=====================================================================
 *修改标记
 *修改时间:2018/3/19 10:37:15
 *修改人: yswenli
 *版本号: V1.0.0.0
 *描述:
 *
 *****************************************************************************/
 using SAEA.Commom;
 using SAEA.RedisSocket;
 using System;
 
 namespace SAEA.RedisSocketTest
 {
     class Program
     {
         static void Main(string[] args)
         {
             ConsoleHelper.Title = "SAEA.RedisSocketTest";
             ConsoleHelper.WriteLine("输入ip:port连接RedisServer");
 
             var ipPort = ConsoleHelper.ReadLine();
             if (string.IsNullOrEmpty(ipPort))
             {
                 ipPort = "127.0.0.1:6379";
             }
             RedisClient redisClient = new RedisClient(ipPort);
             redisClient.Connect(); 
             //redisClient.Connect("wenli"); 
 
 
             var info = redisClient.Info();
             if (info.Contains("NOAUTH Authentication required."))
             {
                 while (true)
                 {
                     ConsoleHelper.WriteLine("请输入redis连接密码");
                     var auth = ConsoleHelper.ReadLine();
                     if (string.IsNullOrEmpty(auth))
                     {
                         auth = "yswenli";
                     }
                     var a = redisClient.Auth(auth);
                     if (a.Contains("OK"))
                     {
                         break;
                     }
                     else
                     {
                         ConsoleHelper.WriteLine(a);
                     }
                 }
             }
 
             //redisConnection.SlaveOf();
 
             //redisConnection.Ping();
 
             redisClient.Select(1);
 
             //ConsoleHelper.WriteLine(redisConnection.Type("key0"));
 
             ConsoleHelper.WriteLine("dbSize:{0}", redisClient.DBSize().ToString());
 
 
             RedisOperationTest(redisClient, true);
             ConsoleHelper.ReadLine();
         }
 
         private static void RedisOperationTest(object sender, bool status)
         {
             RedisClient redisClient = (RedisClient)sender;
             if (status)
             {
                 ConsoleHelper.WriteLine("连接redis服务器成功!");
 
                 #region key value
 
                 ConsoleHelper.WriteLine("回车开始kv插值操作...");
                 ConsoleHelper.ReadLine();
                 for (int i = 0; i < 1000; i++)
                 {
                     redisClient.GetDataBase().Set("key" + i, "val" + i);
                 }
                 //redisConnection.GetDataBase().Exists("key0");
                 ConsoleHelper.WriteLine("kv插入完成...");
 
                 ConsoleHelper.WriteLine("回车开始获取kv值操作...");
                 ConsoleHelper.ReadLine();
 
                 var keys = redisClient.GetDataBase().Keys().Data.ToArray(false, "\r\n");
 
                 foreach (var key in keys)
                 {
                     var val = redisClient.GetDataBase().Get(key);
                     ConsoleHelper.WriteLine("Get val:" + val);
                 }
                 ConsoleHelper.WriteLine("获取kv值完成...");
 
                 ConsoleHelper.WriteLine("回车开始开始kv移除操作...");
                 ConsoleHelper.ReadLine();
                 foreach (var key in keys)
                 {
                     redisClient.GetDataBase().Del(key);
                 }
                 ConsoleHelper.WriteLine("移除kv值完成...");
                 #endregion
 
 
                 #region hashset
                 string hid = "wenli";
 
                 ConsoleHelper.WriteLine("回车开始HashSet插值操作...");
                 ConsoleHelper.ReadLine();
                 for (int i = 0; i < 1000; i++)
                 {
                     redisClient.GetDataBase().HSet(hid, "key" + i, "val" + i);
                 }
                 ConsoleHelper.WriteLine("HashSet插值完成...");
 
                 ConsoleHelper.WriteLine("回车开始HashSet插值操作...");
                 ConsoleHelper.ReadLine();
                 var hkeys = redisClient.GetDataBase().GetHKeys(hid).Data.ToArray();
                 foreach (var hkey in hkeys)
                 {
                     var val = redisClient.GetDataBase().HGet(hid, hkey);
                     ConsoleHelper.WriteLine("HGet val:" + val.Data);
                 }
 
                 var hall = redisClient.GetDataBase().HGetAll("wenli");
                 ConsoleHelper.WriteLine("HashSet查询完成...");
 
                 ConsoleHelper.WriteLine("回车开始HashSet移除操作...");
                 ConsoleHelper.ReadLine();
                 foreach (var hkey in hkeys)
                 {
                     redisClient.GetDataBase().HDel(hid, hkey);
                 }
                 ConsoleHelper.WriteLine("HashSet移除完成...");
 
 
                 #endregion
 
 
                 //redisConnection.GetDataBase().Suscribe((c, m) =>
                 //{
                 //    ConsoleHelper.WriteLine("channel:{0} msg:{1}", c, m);
                 //    redisConnection.GetDataBase().UNSUBSCRIBE(c);
                 //}, "c39654");
 
 
                 ConsoleHelper.WriteLine("测试完成!");
             }
             else
             {
                 ConsoleHelper.WriteLine("连接失败!");
             }
         }
     }
 }

View Code

经过上面的代码测试,使用redis-cli工具进行monitor命令监控发现——搞定了!

另外源码本人已发到github上面了,SAEA.RedisSocket的详细可查看:https://github.com/yswenli/SAEA/tree/master/Src/SAEA.RedisSocket


上一篇 下一篇