手记

Elasticsearch 客户端源码操作流程 1

阅读ES源代码需要对Guice这个依赖注入框架有一点基本了解, 稍微了解一点基础就不影响阅读ES源码的版本是2.4.1版本.

1 初始化TransportClient

我们操作Elasticsearch时, 会首先创建Client类, 代码如下:

private static Client getClient(){
        Settings settings = Settings.builder().put("cluster.name", "SERVICE-ELASTICSEARCH-aae03d413f4744298859cd4245e4eda5")
            .put("client.transport.ping_timeout", "1200s").build();
        Client client = null;        try {
            client = TransportClient.builder().settings(settings).build().addTransportAddress
                (new InetSocketTransportAddress(InetAddress.getByName("10.3.70.101"), 9300));
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }        return client;
    }

即实例化TransportClient对象,看看其源代码

public TransportClient build() {            //省略...
            final ThreadPool threadPool = new ThreadPool(settings);
            NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();            boolean success = false;            try {
                ModulesBuilder modules = new ModulesBuilder();
                modules.add(new Version.Module(version));                // plugin modules must be added here, before others or we can get crazy injection errors...
                for (Module pluginModule : pluginsService.nodeModules()) {
                    modules.add(pluginModule);
                }
                modules.add(new PluginsModule(pluginsService));
                modules.add(new SettingsModule(this.settings));
                modules.add(new NetworkModule(namedWriteableRegistry));
                modules.add(new ClusterNameModule(this.settings));
                modules.add(new ThreadPoolModule(threadPool));
                modules.add(new TransportModule(this.settings, namedWriteableRegistry));
                modules.add(new SearchModule() {                    @Override
                    protected void configure() {                        // noop
                    }
                });
                modules.add(new ActionModule(true));
                modules.add(new ClientTransportModule(hostFailedListener));
                modules.add(new CircuitBreakerModule(this.settings));

                pluginsService.processModules(modules);

                Injector injector = modules.createInjector();                final TransportService transportService = injector.getInstance(TransportService.class);
                transportService.start();
                transportService.acceptIncomingRequests();

                TransportClient transportClient = new TransportClient(injector); // 初始化TransportClient对象
                success = true;                return transportClient;
            } finally {                if (!success) {
                    ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
                }
            }
        }

可以知道代码里面是先通过Guice框架完成相关Module的依赖注入,比如你的增删改查Action实际上是ActionModule所包含的具体实现, 不同Module完成不同的工作.然后TransportClient transportClient = new TransportClient(injector)这个就是实例化TransportClient对象了, 来看看它的构造方法,

private TransportClient(Injector injector) {        super(injector.getInstance(Settings.class), injector.getInstance(ThreadPool.class), injector.getInstance(Headers.class));
        this.injector = injector;
        nodesService = injector.getInstance(TransportClientNodesService.class);
        proxy = injector.getInstance(TransportProxyClient.class); // 代理类, 真正的完成execute工作的人
    }

nodeService是节点操作类, proxy是代理类,是具体的执行execute方法操作的类.因此看看TransportProxyClient

2 TransportProxyClient

@Inject
    public TransportProxyClient(Settings settings, TransportService transportService, TransportClientNodesService nodesService, Map<String, GenericAction> actions) {        this.nodesService = nodesService;
        MapBuilder<Action, TransportActionNodeProxy> actionsBuilder = new MapBuilder<>();        for (GenericAction action : actions.values()) {            if (action instanceof Action) {
                actionsBuilder.put((Action) action, new TransportActionNodeProxy(settings, action, transportService));
            }
        }        this.proxies = actionsBuilder.immutableMap();
    }

TransportProxyClient使用的是构造器注入,  TransportProxyClient 此处是构造器注入, Settings是SettingsModule 绑定的, TransportService 是 TransportModule绑定的, TransportClientNodesService 是 ClientTransportModule绑定的, Map<String, GenericAction> actions是ActionModule绑定生成的. 其中里面的proxies变量比较重要,他其实是一个Map, key是Action, value是TransportActionNodeProxy, 那么这些Action是什么东西呢?
我们知道一开始初始化TransportClient 的时候绑定了很多module其中有一个是ActionModule, 它的configure函数内容如下:

protected void configure() {

        Multibinder<ActionFilter> actionFilterMultibinder = Multibinder.newSetBinder(binder(), ActionFilter.class);        for (Class<? extends ActionFilter> actionFilter : actionFilters) {
            actionFilterMultibinder.addBinding().to(actionFilter);
        }
        bind(ActionFilters.class).asEagerSingleton();
        bind(AutoCreateIndex.class).asEagerSingleton();
        bind(DestructiveOperations.class).asEagerSingleton();
        registerAction(NodesInfoAction.INSTANCE, TransportNodesInfoAction.class);
        registerAction(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class);
        registerAction(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class);
        registerAction(ListTasksAction.INSTANCE, TransportListTasksAction.class);
        registerAction(CancelTasksAction.INSTANCE, TransportCancelTasksAction.class);

        registerAction(ClusterStatsAction.INSTANCE, TransportClusterStatsAction.class);
        registerAction(ClusterStateAction.INSTANCE, TransportClusterStateAction.class);
        registerAction(ClusterHealthAction.INSTANCE, TransportClusterHealthAction.class);
        registerAction(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class);
        registerAction(ClusterRerouteAction.INSTANCE, TransportClusterRerouteAction.class);
        registerAction(ClusterSearchShardsAction.INSTANCE, TransportClusterSearchShardsAction.class);
        registerAction(PendingClusterTasksAction.INSTANCE, TransportPendingClusterTasksAction.class);
        registerAction(PutRepositoryAction.INSTANCE, TransportPutRepositoryAction.class);
        registerAction(GetRepositoriesAction.INSTANCE, TransportGetRepositoriesAction.class);
        registerAction(DeleteRepositoryAction.INSTANCE, TransportDeleteRepositoryAction.class);
        registerAction(VerifyRepositoryAction.INSTANCE, TransportVerifyRepositoryAction.class);
        registerAction(GetSnapshotsAction.INSTANCE, TransportGetSnapshotsAction.class);
        registerAction(DeleteSnapshotAction.INSTANCE, TransportDeleteSnapshotAction.class);
        registerAction(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class);
        registerAction(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class);
        registerAction(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class);

        registerAction(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class);
        registerAction(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class);
        registerAction(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class);
        registerAction(CreateIndexAction.INSTANCE, TransportCreateIndexAction.class);
        registerAction(DeleteIndexAction.INSTANCE, TransportDeleteIndexAction.class);
        registerAction(GetIndexAction.INSTANCE, TransportGetIndexAction.class);
        registerAction(OpenIndexAction.INSTANCE, TransportOpenIndexAction.class);
        registerAction(CloseIndexAction.INSTANCE, TransportCloseIndexAction.class);
        registerAction(IndicesExistsAction.INSTANCE, TransportIndicesExistsAction.class);
        registerAction(TypesExistsAction.INSTANCE, TransportTypesExistsAction.class);
        registerAction(GetMappingsAction.INSTANCE, TransportGetMappingsAction.class);
        registerAction(GetFieldMappingsAction.INSTANCE, TransportGetFieldMappingsAction.class, TransportGetFieldMappingsIndexAction.class);
        registerAction(PutMappingAction.INSTANCE, TransportPutMappingAction.class);
        registerAction(IndicesAliasesAction.INSTANCE, TransportIndicesAliasesAction.class);
        registerAction(UpdateSettingsAction.INSTANCE, TransportUpdateSettingsAction.class);
        registerAction(AnalyzeAction.INSTANCE, TransportAnalyzeAction.class);
        registerAction(PutIndexTemplateAction.INSTANCE, TransportPutIndexTemplateAction.class);
        registerAction(GetIndexTemplatesAction.INSTANCE, TransportGetIndexTemplatesAction.class);
        registerAction(DeleteIndexTemplateAction.INSTANCE, TransportDeleteIndexTemplateAction.class);
        registerAction(ValidateQueryAction.INSTANCE, TransportValidateQueryAction.class);
        registerAction(RefreshAction.INSTANCE, TransportRefreshAction.class);
        registerAction(FlushAction.INSTANCE, TransportFlushAction.class);
        registerAction(SyncedFlushAction.INSTANCE, TransportSyncedFlushAction.class);
        registerAction(ForceMergeAction.INSTANCE, TransportForceMergeAction.class);
        registerAction(UpgradeAction.INSTANCE, TransportUpgradeAction.class);
        registerAction(UpgradeStatusAction.INSTANCE, TransportUpgradeStatusAction.class);
        registerAction(UpgradeSettingsAction.INSTANCE, TransportUpgradeSettingsAction.class);
        registerAction(ClearIndicesCacheAction.INSTANCE, TransportClearIndicesCacheAction.class);
        registerAction(PutWarmerAction.INSTANCE, TransportPutWarmerAction.class);
        registerAction(DeleteWarmerAction.INSTANCE, TransportDeleteWarmerAction.class);
        registerAction(GetWarmersAction.INSTANCE, TransportGetWarmersAction.class);
        registerAction(GetAliasesAction.INSTANCE, TransportGetAliasesAction.class);
        registerAction(AliasesExistAction.INSTANCE, TransportAliasesExistAction.class);
        registerAction(GetSettingsAction.INSTANCE, TransportGetSettingsAction.class);

        registerAction(IndexAction.INSTANCE, TransportIndexAction.class);
        registerAction(GetAction.INSTANCE, TransportGetAction.class);
        registerAction(TermVectorsAction.INSTANCE, TransportTermVectorsAction.class,
                TransportDfsOnlyAction.class);
        registerAction(MultiTermVectorsAction.INSTANCE, TransportMultiTermVectorsAction.class,
                TransportShardMultiTermsVectorAction.class);
        registerAction(DeleteAction.INSTANCE, TransportDeleteAction.class);
        registerAction(ExistsAction.INSTANCE, TransportExistsAction.class);
        registerAction(SuggestAction.INSTANCE, TransportSuggestAction.class);
        registerAction(UpdateAction.INSTANCE, TransportUpdateAction.class);
        registerAction(MultiGetAction.INSTANCE, TransportMultiGetAction.class,
                TransportShardMultiGetAction.class);
        registerAction(BulkAction.INSTANCE, TransportBulkAction.class,
                TransportShardBulkAction.class);
        registerAction(SearchAction.INSTANCE, TransportSearchAction.class);
        registerAction(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class);
        registerAction(MultiSearchAction.INSTANCE, TransportMultiSearchAction.class);
        registerAction(PercolateAction.INSTANCE, TransportPercolateAction.class);
        registerAction(MultiPercolateAction.INSTANCE, TransportMultiPercolateAction.class, TransportShardMultiPercolateAction.class);
        registerAction(ExplainAction.INSTANCE, TransportExplainAction.class);
        registerAction(ClearScrollAction.INSTANCE, TransportClearScrollAction.class);
        registerAction(RecoveryAction.INSTANCE, TransportRecoveryAction.class);
        registerAction(RenderSearchTemplateAction.INSTANCE, TransportRenderSearchTemplateAction.class);        //Indexed scripts
        registerAction(PutIndexedScriptAction.INSTANCE, TransportPutIndexedScriptAction.class);
        registerAction(GetIndexedScriptAction.INSTANCE, TransportGetIndexedScriptAction.class);
        registerAction(DeleteIndexedScriptAction.INSTANCE, TransportDeleteIndexedScriptAction.class);

        registerAction(FieldStatsAction.INSTANCE, TransportFieldStatsTransportAction.class);        // register Name -> GenericAction Map that can be injected to instances.
        MapBinder<String, GenericAction> actionsBinder
                = MapBinder.newMapBinder(binder(), String.class, GenericAction.class);        for (Map.Entry<String, ActionEntry> entry : actions.entrySet()) {
            actionsBinder.addBinding(entry.getKey()).toInstance(entry.getValue().action);
        }//.......省略}public <Request extends ActionRequest, Response extends ActionResponse> void registerAction(GenericAction<Request, Response> action, Class<? extends TransportAction<Request, Response>> transportAction, Class... supportTransportActions) {
        actions.put(action.name(), new ActionEntry<>(action, transportAction, supportTransportActions));
    }

上面ActionModule 通过registerAction方法注册Action, 并加入到actions这个Map函数中, 然后通过MapBinder<String, GenericAction> 将数据注入其他实例,一次初始化TransportProxyClient的时候即可使用此MapBinder<String, GenericAction>

// register Name -> GenericAction Map that can be injected to instances.MapBinder<String, GenericAction> actionsBinder
                = MapBinder.newMapBinder(binder(), String.class, GenericAction.class);        for (Map.Entry<String, ActionEntry> entry : actions.entrySet()) {
            actionsBinder.addBinding(entry.getKey()).toInstance(entry.getValue().action);
        }

3 Request操作流程

经常会使用ES的增删改查以及Admin相关操作, 实际上最终都会经过代理类TransportProxyClient进行执行. 这里以Get操作来说明一下, 其他的Request类似.
首先放一个Get的demo代码:

public class App{    public static void main(String[] args) {
        create();
    }    public static void create(){
        Client client = getClient();


        getInfo(client);        //getIndice(client);
        
    }    private static void getInfo(Client client) {
        GetRequestBuilder getRequestBuilder = client.prepareGet("face_fixedperson", "Fixedperson", "126tc");
        GetResponse getFields = getRequestBuilder.execute().actionGet();
        String sourceAsString = getFields.getSourceAsString();
        System.out.println("-----" + sourceAsString);
    }    private static void getIndice(Client client) {        //Index index = new Index();
        AdminClient admin = client.admin();
        IndicesAdminClient indices = admin.indices();

        ListenableActionFuture<GetIndexResponse> execute = indices.prepareGetIndex().execute();
        GetIndexResponse getIndexResponse = execute.actionGet();
        String[] indices1 = getIndexResponse.getIndices();        for (String in : indices1) {
            System.out.println(in);
        }
    }    private static Client getClient(){
        Settings settings = Settings.builder().put("cluster.name", "SERVICE-ELASTICSEARCH-aae03d413f4744298859cd4245e4eda5")
            .put("client.transport.ping_timeout", "1200s").build();
        Client client = null;        try {
            client = TransportClient.builder().settings(settings).build().addTransportAddress
                (new InetSocketTransportAddress(InetAddress.getByName("10.3.70.101"), 9300));
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }        return client;
    }
}

当执行getRequestBuilder.execute()的时候, 会先转到ActionRequestBuilder的

public void execute(ActionListener<Response> listener) {
        client.execute(action, beforeExecute(request), listener);
    }

然后转到了AbstractClient的

public final <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
        headers.applyTo(request);
        listener = threadedWrapper.wrap(listener);
        doExecute(action, request, listener);
    }

其中的doExecute实际上是TransportClient的doExecute, 其内部实际上就是通过TransportProxyClient来实现进行具体的execute操作的.

@Override
    protected <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void doExecute(Action<Request, Response, RequestBuilder> action, Request request, ActionListener<Response> listener) {
        proxy.execute(action, request, listener);
    }

继续看看TransportProxyClient的execute方法:

public <Request extends ActionRequest, Response extends ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder>> void execute(final Action<Request, Response, RequestBuilder> action, final Request request, ActionListener<Response> listener) {        final TransportActionNodeProxy<Request, Response> proxy = proxies.get(action);
        nodesService.execute(new TransportClientNodesService.NodeListenerCallback<Response>() {            @Override
            public void doWithNode(DiscoveryNode node, ActionListener<Response> listener) {
                proxy.execute(node, request, listener);
            }
        }, listener);
    }

TransportActionNodeProxy<Request, Response> proxy = proxies.get(action); 这一步根据Action来获取具体的TransportActionNodeProxy类, 具体可参见TransportProxyClient的构造那部分.
接着nodesService.execute方法实际上就是从可用的Node中随机选择出一个node,然后执行proxy.execute(node, request, listener)方法, 进而将请求发给此Node.

大题来说, ES一开始初始化TransportClient的时候会绑定多种多样的Module, 然后在初始化TransportClient的时候同时会初始化代理类TransportProxyClient, 当通过TransportClient来提交Request请求的时候吧, 会交由代理类来真正的执行, 代理类会从可用节点中选择一个节点然后发送请求到此节点上. 这就是ES操作的前奏, 至于具体的操作细节则后面在写.



作者:kason_zhang
链接:https://www.jianshu.com/p/32976e99bf4c


0人推荐
随时随地看视频
慕课网APP