手记

实现一个分布式调度系统-RPC(上)

上一篇主要介绍了一些Loadbalance和Ha策略:
https://www.imooc.com/article/274780
在我们的项目中,前面已经有了注册中心,每个work机器启动后会注册到我们的注册中心。
然后manager发起请求后会通过我们的loadbalance策略,会选择一台合适的work发起任务的
请求。
到这里我们发现我们少了如何让manager通知我们选择的work,因此这里这里需要实现一个
RPC的功能,可以让manager和work进行通信。
我们可以自己实现一个RPC,也可以使用现有的一些框架来实现。

这里我们先使用hadoop-common里面自带的RPC来实现我们的功能。下一次,我们会自己通过
netty来实现一个简单的的RPC功能

依赖

首先引入依赖包,可以去掉一些自己不要的包,以免冲突
   <hadoop-common.version>2.7.2</hadoop-common.version>
          <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <artifactId>avro</artifactId>
                    <groupId>org.apache.avro</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jackson-mapper-asl</artifactId>
                    <groupId>org.codehaus.jackson</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jackson-core-asl</artifactId>
                    <groupId>org.codehaus.jackson</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>jackson-jaxrs</artifactId>
                    <groupId>org.codehaus.jackson</groupId>
                </exclusion>
                <exclusion>
                    <groupId>javax.servlet</groupId>
                    <artifactId>servlet-api</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>javax.servlet.jsp</groupId>
                    <artifactId>jsp-api</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

具体的实现

  • 定义接口,使用hadoop 的 rpc 要继承VersionedProtocol
public interface ExecutorRun extends VersionedProtocol {

    //定义一个版本号
    public static long versionID = 1;

    Result<?> run(JobInfoParam jobInfo);

    Result<LogResult> log(String executorType, String logId, long pointer);

}
  • 传输的对象,使用hadoop 的 rpc要实现Writable接口,并且实现它的两个方法
public class JobInfoParam implements Serializable,Writable {

   private String id;

   private String logId;//没执行一次任务对应一次日志记录,用于唯一的标志

   private String jobFlow;   //任务流id

   private Integer jobGroup;//执行器id

   ......

   @Override
   public void write(DataOutput dataOutput) throws IOException {
       String jsonStr = JsonMapper.obj2String(this);
       WritableUtils.writeString(dataOutput, jsonStr);
   }

   @Override
   public void readFields(DataInput dataInput) throws IOException {
       String jsonStr = WritableUtils.readString(dataInput);
       JobInfoParam event = JsonMapper.string2Obj(jsonStr, JobInfoParam.class);
       BeanUtils.copyProperties(event, this);
   }
}
  • 服务端实现
@SpiMeta(name = "hadoopServer")
@Slf4j
public class HadoopServer implements RpcServer {

    private Integer port;

    private String host;//当前执行器的地址

    private String logPath;

    private RPC.Server server = null;

    @Override
    public void init(Integer port, String server, String logPath) {
        this.port = port;
        this.host = server;
        this.logPath=logPath;
    }

    public void start(CountDownLatch countDownLatch) {

        // 创建一个RPC builder
        RPC.Builder builder = new RPC.Builder(new Configuration());

        //指定RPC Server的参数
        builder.setBindAddress(host);
        builder.setPort(port);

        builder.setProtocol(ExecutorRun.class);
        builder.setInstance(new ExecutorRunImpl(new StringBuilder().
                append(host).append(":").
                append(port).toString(), logPath));

        //创建Server
        try {
            server= builder.build();
            //启动服务
            server.start();
            countDownLatch.countDown();
        } catch (IOException e) {
            e.printStackTrace();
            log.error("hadoop rpc start error!!!");
        }
    }

    @Override
    public void closeResource() {
        if(server!=null){
            server.stop();
        }
    }
}
  • 客户端实现,获取到work的地址,然后获取远程访问的本地代理
 public <T> T getProxy(Class<T> clz, Cluster cluster) {

        LoadBalance loadBalance=cluster.getLoadBalance();
        RequestPacket request=new RequestPacket();
        request.setRequestId(RequestIdGenerator.getRequestId());
        Endpoint endpoint=loadBalance.select(request);
        //得到服务器端的一个代理对象
        try {
            return (T) RPC.getProxy(clz,ExecutorRun.versionID ,new InetSocketAddress(endpoint.getHost(), endpoint.getPort()),
                    new Configuration());
        } catch (IOException e) {
            e.printStackTrace();
            return null;
        }
    }

总结

可以看到很方便就实现了我们需要的功能
下次我们基于netty自己实现一个RPC的功能
完整代码见github: https://github.com/lizu18xz/faya-job
0人推荐
随时随地看视频
慕课网APP