thrift rpc 四种范例的服务端的实现具体介绍
这里主要是使用 thrift 开发的时间服务器端的实现,以及 thrift 提供给我们多钟的服务的实现,以及每个实现的服务器的特点和 API 介绍,TServer 主要包罗以下几种实现
- TSimpleServer
- 阻塞的但线程模式,他并没有实战价值,只是对于学习 thrift 服务器的简单入门是十分友爱的
- TThreadPoolServer
- TNonblockingServer
- TSelectorServer
- 他的实现主要是 selector 模型,主从 selector 模式,跟我们了解过的 netty 中使用的设计十分类似
TSimpleServer
既然他是一个没有实战价值的服务实现,那为什么我们要学习他兵器在入门阶段使用它呢,因为就是因为它的简单,才利于学习,利于我们了解 thrift 中对于服务的设计,对于接下来对多线程乃至主从 selector 的学习也会有很好的铺垫作用。
TSimpleServer 的内部实现原理
内部的实现是依赖 JDK 的 SocketServer + accept 并且没有使用 thread 来提高服从,是一个阻塞的但线程模式。
- // 打开查看方法 org.apache.thrift.server.TSimpleServer#serve
- // 这里有一个执行的片段 client = serverTransport_.accept(); 这就是实现 accept 的核心逻辑
复制代码 TThreadPoolServer
线程池的版本他使用了一个线程池来解决的阻塞的请求,但是他也是阻塞的,一个线程池只能处置惩罚一个请求,它的大致流程 accept -> 吸收到请求 -> 分配线程实行 它的好处不言而喻可以同时提供给多个客户端的服务,但是毛病是如果线程池的设置太多会占用太多资源,如果某个线程发生阻塞这个时间只能阻塞等待这个线程实行完毕,但是如果太少又没有办法给更多的客户端提供服务
- private static void tThreadPoolServer() throws TTransportException {
- // TTransportFactory
- try (final TServerTransport serverTransport = new TServerSocket(9090)) {
- final TThreadPoolServer threadPoolServer = new TThreadPoolServer(
- new TThreadPoolServer.Args(serverTransport)
- .protocolFactory(new TBinaryProtocol.Factory())
- .processor(new UserService.Processor<>(new UserServiceImpl()))
- );
- threadPoolServer.serve();
- }
- }
复制代码 和 simple 的版本最紧张的区别就是在 TServer 使用的是 TThreadPoolServer,他内置一个线程池
- /**
- * 可以在构造器中看到有一个线程池的创建和复制逻辑
- */
- public TThreadPoolServer(Args args) {
- super(args);
- stopTimeoutUnit = args.stopTimeoutUnit;
- stopTimeoutVal = args.stopTimeoutVal;
- // 赋值线程池
- executorService_ =
- args.executorService != null ? args.executorService : createDefaultExecutorService(args);
- }
复制代码 他的线程池创建逻辑
- public int minWorkerThreads = 5;
- public int maxWorkerThreads = Integer.MAX_VALUE;
- private static ExecutorService createDefaultExecutorService(Args args) {
- return new ThreadPoolExecutor(
- args.minWorkerThreads, // 默认的核心线程
- args.maxWorkerThreads, // 最大的线程池
- 60L,
- TimeUnit.SECONDS,
- new SynchronousQueue<>(),
- // 线程工厂
- new ThreadFactory() {
- final AtomicLong count = new AtomicLong();
- @Override
- public Thread newThread(Runnable r) {
- Thread thread = new Thread(r);
- thread.setDaemon(true);
- thread.setName (
- String.format("TThreadPoolServer WorkerProcess-%d", count.getAndIncrement()));
- return thread;
- }
- });
- }
- public Args maxWorkerThreads(int n) {
- maxWorkerThreads = n;
- return this;
- }
复制代码 我们可以看到最大的默认线程池的数量是 Integer.MAX_VALUE,但是是可以配置的,我们尽量不要使用它的默认配置可以本身创建线程池,如果不本身创建可以在使用默认线程池的时间指定最大的线程池,使用方法 maxWorkerThreads,防止在使用的时间出现 OOM
实行流程
accept -> 分配 worker 线程实行任务(这里面的内容就是但线程的实行流程)
非阻塞 IO
在 thrift 的规定中,非阻塞的 server 必须使用的 tTransport 是 TFramedTransport,但是如果使用 TFramedTransport 必须对应的 tProtocol 则必须是搭配的是 TCompactProtocol,因此在客户端的选择上肯定也是要使用的网络传输 -> TFramedTransport,使用的协议肯定自然也是 -> TCompactProtocol,但是为什么会存在如许逼迫的搭配选择呢?大概官方的意图是既然选择了 NIO 的模式肯定是选择了更高效的模式,所以一不做二不休,直接一次性到位,必须搭配对应更高效的压缩的模式来传输数据。
TNonBlockingServer
TNonBlockingServer 是一个非阻塞 IO,底层的实现是使用的 NIO,但是他使用的是但线程,并没有使用多线程,这就导致在实际生产中并不会选择这个作为我们的服务端的实现方式。不外可以见到看一下他们的编码实现方式
服务端的代码实现
- public static void main(String[] args) throws TTransportException {
- tNonBlockingServer();
- }
- private static void tNonBlockingServer() throws TTransportException {
- try (final TNonblockingServerTransport framedTransport = new TNonblockingServerSocket(9090)) {
- final TNonblockingServer nonblockingServer = new TNonblockingServer(
- new TNonblockingServer.Args(framedTransport)
- .protocolFactory(new TCompactProtocol.Factory())
- .processor(new UserService.Processor<>(new UserServiceImpl()))
- );
- framedTransport.accept();
- nonblockingServer.serve();
- }
- }
复制代码 客户端的代码实现
- public static void main(String[] args) {
- simpleNioClient();
- }
- public static void simpleNioClient() {
- try (TFramedTransport tFramedTransport = new TFramedTransport(new TSocket("localhost", 9090))) {
- tFramedTransport.open();
- System.out.println(new UserService.Client(new TCompactProtocol(tFramedTransport))
- .queryUserByNameAndPassword("new User()", "222"));
- } catch (TException e) {
- throw new RuntimeException(e);
- }
- }
复制代码 TThreadSelectorServer
TThreadSelectorServer 其实底层的实现是一个主从的 Reactor 模型,一下是他的架构实现图,关于主从 reactor 的调度流程
- /**
- * 服务端的代码实现
- */
- private static void tThreadSelectorServer() throws TTransportException {
- try (TNonblockingServerSocket serverSocket = new TNonblockingServerSocket(9090)) {
- serverSocket.accept();
- final TThreadedSelectorServer selectorServer = new TThreadedSelectorServer(
- new TThreadedSelectorServer.Args(serverSocket)
- .protocolFactory(new TCompactProtocol.Factory())
- .processor(new UserService.Processor<>(new UserServiceImpl()))
- );
- selectorServer.serve();
- }
- }
复制代码 默认的实现线程池大小是 5,可以配置 workerThreads 这个其实就是指定的焦点线程数。默认的是 5 使用的创建线程池的方式为 new ThreadPoolExecutor(...),但是也可以本身创建线程池,使用 executorService 这个参数进行配置。底层其实和 TNonBlockingServer 是一样的,知识在单线程的底子上增长了线程池的支持使其在实行 work 任务的时间可以更高效的使用线程池来实行任务。
留意点地方
- 客户端的 thrift 版本必须和服务端使用的 thrift 的版本一致
- 客户端使用的纯属和协议必须和服务端的不停,也就是 tTransport,tProtocol
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |