博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop源码分析8: IPC流程(3)客户端的clients、connections、calls复用
阅读量:4546 次
发布时间:2019-06-08

本文共 8923 字,大约阅读时间需要 29 分钟。

1. RPCClientCache 中的 clients
publicclass 
RPCClientCache {
   private 
Map<SocketFactory,Client> 
clients = new 
HashMap<SocketFactory,Client>();
   
    synchronizedClient 
getClient(Configuration conf,
      SocketFactory factory) {
     // Construct & cacheclient.  The configuration is only used fortimeout,
     // and Clients haveconnection pools.  So we can either (a) losesome
     // connection pooling andleak sockets, or (b) use the same timeout for all
     // configurations. Since the IPC is usually intended globally,not
     // per-job, we choose(a).
     Client client= 
clients.get(factory);
     if (client == null) {
       client =new Client(ObjectWritable.class, conf, factory);
      
clients.put(factory,client);
     } else {
      client.incCount();
     }
     return client;
   }
      
   void 
stopClient(Clientclient) {
     synchronized (this) {
      client.decCount();
       if(client.isZeroReference()) {
        
clients.remove(client.getSocketFactory());
       }
     }
     if (client.isZeroReference()){
      client.stop();
     }
   }
}
以上方法在下列代码中调用:
publicclass 
RPCInvoker implements 
InvocationHandler{
    privateClientConnectionId remoteId;
    private Clientclient;
    private boolean isClosed= false;
   public 
RPCInvoker(Class<? extendsVersionedProtocol> protocol,
       InetSocketAddress address, UserGroupInformationticket,
       Configuration conf, SocketFactory factory,
       int rpcTimeout) throws IOException {
     this.remoteId = ClientConnectionId.getConnectionId(address,protocol,
         ticket, rpcTimeout,conf);
     this.client = RPC.CLIENTS.
getClient(conf, factory);
    }  
  
    synchronizedvoid 
close() {
      if(!isClosed) {
       isClosed = true;
       
RPC.CLIENTS.stopClient(client);
     }
   } 
}
publicclass 
RPC {
  public staticRPCClientCache 
CLIENTS=newRPCClientCache();
   //for unit testing only
  staticClient 
getClient(Configuration conf){
  return 
CLIENTS.getClient(conf);
 } 
 
  public staticObject[] 
call(Method method, Object[][]params,
                         InetSocketAddress[]addrs, 
                         UserGroupInformation ticket,Configuration conf)
   throws IOException, InterruptedException {
   RPCInvocation[] invocations = newRPCInvocation[params.length];
   for (int i = 0; i < params.length; i++)
     invocations[i] = newRPCInvocation(method, params[i]);
   Client client= 
CLIENTS.getClient(conf);
   try {
   Writable[] wrappedValues= 
     client.call(invocations,addrs, method.getDeclaringClass(), ticket, conf);
   
   if (method.getReturnType() == Void.TYPE) {
     return null;
   }
   Object[] values =
    (Object[])Array.newInstance(method.getReturnType(),wrappedValues.length);
   for (int i = 0; i < values.length; i++)
     if (wrappedValues[i] !=null)
       values[i]= ((ObjectWritable)wrappedValues[i]).get();
   
   return values;
   } finally {
    
CLIENTS.stopClient(client);
   }
  }
 
  public staticVersionedProtocol 
getProxy(
     Class<? extendsVersionedProtocol> protocol,
     long clientVersion,InetSocketAddress addr, UserGroupInformation ticket,
     Configuration conf,SocketFactory factory, int rpcTimeout) throws IOException {
   if (UserGroupInformation.isSecurityEnabled()){
    SaslRpcServer.init(conf);
   }
   VersionedProtocol proxy =
      (VersionedProtocol) Proxy.newProxyInstance(
          protocol.getClassLoader(),new Class[] { protocol },
         
 newRPCInvoker(protocol, addr, ticket, conf, factory,rpcTimeout));
   long serverVersion =proxy.getProtocolVersion(protocol.getName(), 
                                         clientVersion);
   if (serverVersion == clientVersion) {
     return proxy;
   } else {
     throw newRPCVersionMismatch(protocol.getName(),clientVersion, 
                          serverVersion);
   }
  }
}
2. Client 的connections
publicclass 
Client {
  
  public 
Hashtable
ClientConnectionId
ClientConnection> 
connections =new Hashtable<ClientConnectionId, ClientConnection>();
  public void 
stop() { 
 ..............
   // wake up all connections
   synchronized (
connections) {
     for (ClientConnection conn: 
connections.values()) {
      conn.interrupt();
     }
   }
   
   // wait until all connections are closed
   while (!
connections.isEmpty()) {
     try {
      Thread.sleep(100);
     } catch (InterruptedExceptione) {
     }
   }
  ....................
  }
    //for unit testing only
 Set<ClientConnectionId> 
getConnectionIds(){
   synchronized (
connections) {
    return 
connections.keySet();
   }
  }
    
  privateClientConnection 
getConnection(ClientConnectionIdremoteId,
                             ClientCall call)
                             throws IOException,InterruptedException {
   if (!running.get()) {
     // the client isstopped
     throw new IOException("Theclient is stopped");
   }
   ClientConnection connection;
  
   do {
     synchronized(
connections) {
       connection= 
connections.get(remoteId);
       if(connection == null) {
        connection = newClientConnection(remoteId,this);
        
connections.put(remoteId,connection);
       }
     }
   } while (!connection.addCall(call));
   
   //we don't invoke the method below inside"synchronized (connections)"
   //block above. The reason for that is if theserver happens to be slow,
   //it will take longer to establish a connectionand that will slow the
   //entire system down.
   connection.setupIOstreams();
   return connection;
  }
}
publicclass 
ClientConnection extends 
Thread {
  
  
   private synchronizedvoid 
close() {
     if(!shouldCloseConnection.get()) {
       return;
     }
     // release theresources
     // first thing to do;take theconnection out of the connection list
     synchronized(client.
connections) {
       if(
client.connections.get(remoteId) == this){
      
client.connections.remove(remoteId);
       }
     }
  。。。。。。
  } 
}
3. ClientConnection 的calls
publicclass 
ClientConnection extends 
Thread {
   private 
Hashtable<Integer,ClientCall> calls = new Hashtable<Integer,ClientCall>();
   
   public synchronizedboolean 
addCall(ClientCall call) {
     if(shouldCloseConnection.get())
       returnfalse;
    
calls.put(call.id,call);
     notify();
     return true;
   }
  
      
   private synchronizedboolean 
waitForWork() {
     if (
calls.isEmpty()&& !shouldCloseConnection.get() && client.running.get()) {
       longtimeout = maxIdleTime-
           (System.currentTimeMillis()-lastActivity.get());
       if(timeout>0) {
        try {
          wait(timeout);
        } catch (InterruptedException e) {}
       }
     }
     
     if (!
calls.isEmpty()&& !shouldCloseConnection.get() &&client.running.get()) {
       returntrue;
     } else if(shouldCloseConnection.get()) {
       returnfalse;
     } else if(
calls.isEmpty()) { // idle connection closed orstopped
      markClosed(null);
       returnfalse;
     } else { // get stopped butthere are still pending requests 
      markClosed((IOException)new IOException().initCause(
          newInterruptedException()));
       returnfalse;
     }
   }
    privatevoid 
receiveResponse() {
     if(shouldCloseConnection.get()) {
      return;
     }
     touch();
     
     try {
       int id =in.readInt();                 // try to read an id      
       ClientCallcall = 
calls.get(id);
       int state= in.readInt();     // read callstatus
       if (state== Status.SUCCESS.state) {
        Writable value =ReflectionUtils.newInstance(client.valueClass, client.conf);
        value.readFields(in);              // read value
        call.setValue(value);
        
calls.remove(id);
       } else if(state == Status.ERROR.state) {
        call.setException(newRemoteException(WritableUtils.readString(in),
                                       WritableUtils.readString(in)));
        
calls.remove(id);
       } else if(state == Status.FATAL.state) {
        // Close the connection
        markClosed(newRemoteException(WritableUtils.readString(in), 
                                 WritableUtils.readString(in)));
       }
     } catch (IOException e){
      markClosed(e);
     }
   } 
  
    
   private synchronizedvoid 
close() {
     if(!shouldCloseConnection.get()) {
       return;
     }
     // release theresources
     // first thing to do;take theconnection out of the connection list
     synchronized(client.connections) {
       if(client.connections.get(remoteId) == this) {
      client.connections.remove(remoteId);
       }
     }
     // close the streams andtherefore the socket
    IOUtils.closeStream(out);
    IOUtils.closeStream(in);
     disposeSasl();
     // clean up all calls
     if (closeException == null){
       if(!
calls.isEmpty()) {
       
        // clean up calls anyway
        closeException = new IOException("Unexpectedclosed connection");
        cleanupCalls();
       }
     } else {
       // log theinfo
      
       // cleanupcalls
      cleanupCalls();
     } 
   }
   
    
   privatevoid 
cleanupCalls() {
     Iterator<Entry<Integer,ClientCall>> itor= 
calls.entrySet().iterator() ;
     while (itor.hasNext()){
       ClientCallc = itor.next().getValue(); 
      c.setException(closeException); // local exception
      itor.remove();        
     }
   }
  
 
}
4.并发执行以下代码
public class
MyClient {
    public static void
main(String[] args) throws Exception {
      final InetSocketAddress addr =new InetSocketAddress("localhost",
MyServer.IPC_PORT);
      final Query query = (Query)RPC.getProxy(Query.class, MyServer.IPC_VER,
addr, new Configuration());
      
new Thread() {
          @Override
          public void
run() {
             FileStatusfileStatus1 = query.getFileStatus("/tmp/testIPC");
             System.out.println(fileStatus1);
              FileStatusfileStatus2 = query.getFileStatus("/tmp/testIPC2");
              System.out.println(fileStatus2);
           }
       }
.start();
       
new Thread() {
         @Override
          public void
run() {
              CPUStatus cpuStatus1 =query.getCPUStatus("Intel");
             System.out.println(cpuStatus1);
              CPUStatus cpuStatus2 =query.getCPUStatus("AMD");
             System.out.println(cpuStatus2);
          }
       }
.start();
       
new Thread() {
        @Override
         public void
run() {
         try {
               Queryquery2 = (Query) RPC.getProxy(Query.class,
              MyServer.IPC_VER, addr, newConfiguration());
               FileStatusfileStatus1 = query2.getFileStatus("/tmp/testIPC");
              System.out.println(fileStatus1);
               FileStatusfileStatus2 = query2.getFileStatus("/tmp/testIPC2");
              System.out.println(fileStatus2);
              RPC.stopProxy(query2);
           } catch(IOException e) {
               e.printStackTrace();
           }
       }
     }
.start();
      RPC.stopProxy(query);
  } 
}
以上三个线程可以共用同一个 
Client对象、ClientConnection线程 、ClientConnectionId对象,将ClientCall放在同一个calls中

转载于:https://www.cnblogs.com/leeeee/p/7276530.html

你可能感兴趣的文章
Nmap命令的实用范例
查看>>
7-1 查找整数编程总结
查看>>
安装PHP以及搭建博客(一)
查看>>
关于WORD文档的读取乱码问题
查看>>
[问题记录.dotnet]取网卡信息报错"找不到"-WMI - Not found
查看>>
Codeforces Round #254 (Div. 2):B. DZY Loves Chemistry
查看>>
linux 安装虚拟机
查看>>
Thinkphp5笔记二:创建模块
查看>>
centos 安装mysql
查看>>
Redis 禁用FLUSHALL FLUSHDB KEYS 命令
查看>>
Matlab中imread函数使用报错“不应为MATLAB 表达式”分析
查看>>
MFC ADO数据库操作
查看>>
图像质量评价-NQM和WPSNR
查看>>
面试准备——相关知识
查看>>
每日一字:悟
查看>>
CentOS7.6安装稳定版Nginx
查看>>
LeetCode 1002. Find Common Characters (查找常用字符)
查看>>
建立隐藏管理员用户
查看>>
android设置图文提醒功能
查看>>
ajax跨域提交
查看>>