手记

Java分布式学习入门指南

概述

本文详细介绍了Java分布式系统的各个方面,包括分布式系统概述、Java在分布式系统中的应用、分布式架构模式、服务发现与注册、负载均衡与路由、以及分布式缓存机制。文章还通过实战案例展示了如何使用Spring Boot和Docker搭建分布式应用,并深入探讨了分布式数据存储和ZooKeeper的使用方法。通过这些内容,读者可以全面了解和掌握Java分布式学习的相关知识。

Java分布式系统简介

分布式系统概述

分布式系统是一种系统,它包含多个通过网络互连的独立计算机或节点,它们能够协同工作以实现特定目标。这些节点之间通过网络进行通信,以完成单个节点无法完成的任务。分布式系统的关键特点是,它们能够提供比单一系统更高的性能和可用性。通过将任务分配到多个节点,分布式系统可以在多个方面提供优势,例如负载均衡、容错和扩展性。

Java在分布式系统中的应用

Java语言因其平台独立性、强大的类库和丰富的开发工具,在分布式系统中得到了广泛应用。Java提供了多种机制来支持分布式应用的开发,例如Java RMI(远程方法调用)、Socket编程、Java NIO(非阻塞I/O)和Java EE(企业版)等。这些技术使得Java成为开发分布式应用的理想选择。

Java RMI示例

Java RMI允许远程对象之间进行分布式通信。以下是一个简单的RMI示例,展示了如何定义、实现和注册远程对象,以及如何通过客户端调用远程方法。

定义远程接口:

import java.rmi.Remote;
import java.rmi.RemoteException;

public interface HelloWorld extends Remote {
    String sayHello() throws RemoteException;
}

实现远程接口:

import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;

public class HelloWorldImpl extends UnicastRemoteObject implements HelloWorld {
    protected HelloWorldImpl() throws RemoteException {
        super();
    }

    @Override
    public String sayHello() throws RemoteException {
        return "Hello, world!";
    }
}

注册远程对象:

import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;

public class HelloWorldServer {
    public static void main(String[] args) {
        try {
            HelloWorldImpl hw = new HelloWorldImpl();
            Registry registry = LocateRegistry.createRegistry(1099);
            registry.rebind("HelloWorld", hw);
            System.out.println("Server ready.");
        } catch (Exception e) {
            System.err.println("Server exception: " + e.toString());
            e.printStackTrace();
        }
    }
}

调用远程方法:

import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;

public class HelloWorldClient {
    public static void main(String[] args) {
        try {
            Registry registry = LocateRegistry.getRegistry("localhost", 1099);
            HelloWorld hw = (HelloWorld) registry.lookup("HelloWorld");
            System.out.println(hw.sayHello());
        } catch (Exception e) {
            System.err.println("Client exception: " + e.toString());
            e.printStackTrace();
        }
    }
}

分布式系统的优点与挑战

优点

  1. 高可用性:分布式系统可以提供比单一系统更高的可用性,因为即使某个节点出现故障,其他节点仍能继续提供服务。
  2. 可伸缩性:通过增加更多的节点,可以轻松地扩展分布式系统以处理更多负载。
  3. 负载均衡:分布式系统可以将任务分配给多个节点,从而实现负载均衡,避免单个节点过载。
  4. 容错性:分布式系统可以设计为具有容错性,能够在节点故障时仍然提供服务。
  5. 灵活性:分布式系统可以根据需求动态添加或移除节点,提高了系统的灵活性。

挑战

  1. 复杂性:分布式系统的设计和实现比单一系统更复杂,需要处理网络延迟、节点故障和数据一致性等问题。
  2. 数据一致性:在分布式系统中保持数据一致性是一个挑战,特别是在节点之间可能存在网络延迟或故障的情况下。
  3. 安全性和隐私:分布式系统需要保护节点之间的通信安全,并确保数据的隐私。
  4. 部署和维护:分布式系统的部署和维护比单一系统更为复杂,需要考虑节点的部署位置、网络配置和数据备份等问题。
分布式开发基础

Java网络编程基础

Java网络编程是开发分布式应用的基础。Java提供了多种网络编程技术,包括传统的Socket编程和基于NIO的异步编程。下面我们将介绍Java网络编程的基础知识和示例代码。

Socket编程示例

Socket编程是Java网络编程的基础。以下是一个简单的Socket编程示例,演示了如何创建一个服务器端和一个客户端。

创建服务器端:

import java.io.*;
import java.net.*;

public class SimpleServer {
    public static void main(String[] args) throws IOException {
        ServerSocket serverSocket = new ServerSocket(8000);
        System.out.println("Server started, waiting for connection...");

        Socket clientSocket = serverSocket.accept();
        System.out.println("Client connected.");

        PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
        BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));

        String inputLine;
        while ((inputLine = in.readLine()) != null) {
            System.out.println("Received: " + inputLine);
            out.println("Echo: " + inputLine);
        }

        clientSocket.close();
        serverSocket.close();
    }
}

创建客户端:

import java.io.*;
import java.net.*;

public class SimpleClient {
    public static void main(String[] args) throws IOException {
        Socket socket = new Socket("localhost", 8000);
        PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
        BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));

        out.println("Hello, server!");
        String response = in.readLine();
        System.out.println("Received: " + response);

        socket.close();
    }
}

Java NIO示例

Java NIO(New I/O)是Java提供的一种异步编程模型,适用于高并发场景。以下是一个简单的NIO示例,展示了如何使用NIO实现一个简单的服务器端。

创建NIO服务器端:

import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.*;

public class SimpleNioServer {
    public static void main(String[] args) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.socket().bind(new InetSocketAddress(8001));

        Selector selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            selector.select();
            Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
            while (keys.hasNext()) {
                SelectionKey key = keys.next();
                keys.remove();

                if (key.isAcceptable()) {
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector, SelectionKey.OP_READ);
                } else if (key.isReadable()) {
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    int bytesRead;
                    while ((bytesRead = socketChannel.read(buffer)) > 0) {
                        buffer.flip();
                        System.out.println("Received: " + new String(buffer.array(), 0, bytesRead));
                        buffer.clear();
                    }
                    socketChannel.close();
                }
            }
        }
    }
}

Java并发编程基础

Java并发编程是开发分布式应用中另一个重要的方面。Java提供了多种并发机制,包括线程、锁、并发容器和并发工具类。以下我们将介绍Java并发编程的基础知识和示例代码。

创建和管理线程

Java提供了多种方式来创建和管理线程。以下是一个简单的线程示例,演示了如何创建并启动一个线程。

创建线程示例:

public class SimpleThread extends Thread {
    public void run() {
        for (int i = 0; i < 10; i++) {
            System.out.println("Thread: " + getName() + " - " + i);
        }
    }

    public static void main(String[] args) {
        SimpleThread thread = new SimpleThread();
        thread.start();
    }
}

使用锁控制并发访问

Java提供了多种锁机制来控制并发访问,确保多个线程能够安全地访问共享资源。以下是一个简单的锁示例,演示了如何使用synchronized关键字来控制并发访问。

使用锁示例:

public class SimpleLock {
    private int count = 0;
    private final Object lock = new Object();

    public void increment() {
        synchronized (lock) {
            count++;
        }
    }

    public int getCount() {
        synchronized (lock) {
            return count;
        }
    }

    public static void main(String[] args) {
        SimpleLock lock = new SimpleLock();
        Thread thread1 = new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                lock.increment();
            }
        });
        Thread thread2 = new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                lock.increment();
            }
        });

        thread1.start();
        thread2.start();

        try {
            thread1.join();
            thread2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Count: " + lock.getCount());
    }
}

并发容器和并发工具类

Java提供了多种并发容器和并发工具类来简化并发编程。以下是一个简单的并发容器示例,演示了如何使用ConcurrentHashMap来实现线程安全的映射。

使用并发容器示例:

import java.util.concurrent.*;

public class SimpleConcurrentMap {
    public static void main(String[] args) {
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

        Thread thread1 = new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                map.put("Key" + i, i);
            }
        });

        Thread thread2 = new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                map.put("Key" + i, i);
            }
        });

        thread1.start();
        thread2.start();

        try {
            thread1.join();
            thread2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("Map size: " + map.size());
    }
}
分布式架构模式

服务发现与注册

服务发现与注册是分布式系统中的一种常见模式,用于解决服务发现和负载均衡等问题。服务发现是指客户端能够自动找到并连接到可用的服务实例;服务注册则是服务实例向服务注册中心注册自己的信息,以便其他服务能够找到它。

服务注册与发现示例

以下是一个简单的服务注册与发现示例,使用了ZooKeeper作为服务注册中心。

创建服务注册中心(ZooKeeper):

docker run -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 zookeeper

服务注册示例:

import org.apache.zookeeper.*;
import org.apache.zookeeper.client.ZooKeeperClient;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class SimpleServiceRegistry {
    private static final String ZK_ADDRESS = "localhost:2181";
    private static final String SERVICE_PATH = "/services";
    private static final CountDownLatch latch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
        ZooKeeper zk = new ZooKeeper(ZK_ADDRESS, 3000, event -> {
            if (event.getType() == Watcher.Event.KeeperState.SyncConnected) {
                latch.countDown();
            }
        });

        latch.await();

        String serviceId = zk.create(SERVICE_PATH, "service1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

        System.out.println("Service registered: " + serviceId);

        zk.close();
    }
}

服务发现示例:

import org.apache.zookeeper.*;
import org.apache.zookeeper.client.ZooKeeperClient;

import java.util.List;
import java.util.concurrent.CountDownLatch;

public class SimpleServiceDiscovery {
    private static final String ZK_ADDRESS = "localhost:2181";
    private static final String SERVICE_PATH = "/services";
    private static final CountDownLatch latch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
        ZooKeeper zk = new ZooKeeper(ZK_ADDRESS, 3000, event -> {
            if (event.getType() == Watcher.Event.KeeperState.SyncConnected) {
                latch.countDown();
            }
        });

        latch.await();

        List<String> serviceIds = zk.getChildren(SERVICE_PATH, true);

        for (String serviceId : serviceIds) {
            byte[] data = zk.getData(SERVICE_PATH + "/" + serviceId, false, new Stat());
            System.out.println("Service found: " + new String(data));
        }

        zk.close();
    }
}

负载均衡与路由

负载均衡和路由是分布式系统中常见的模式,用于将请求分发到多个服务实例上。负载均衡可以使用多种算法实现,例如轮询、随机和最少连接等。

负载均衡示例

以下是一个简单的负载均衡示例,展示了如何使用轮询算法来实现负载均衡。

创建负载均衡器:

import java.util.*;

public class SimpleLoadBalancer {
    private List<String> serviceIds = new ArrayList<>();
    private Random random = new Random();

    public void addService(String serviceId) {
        serviceIds.add(serviceId);
    }

    public String getLoadBalancedService() {
        return serviceIds.get(random.nextInt(serviceIds.size()));
    }

    public static void main(String[] args) {
        SimpleLoadBalancer lb = new SimpleLoadBalancer();
        lb.addService("service1");
        lb.addService("service2");
        lb.addService("service3");

        for (int i = 0; i < 10; i++) {
            String service = lb.getLoadBalancedService();
            System.out.println("Load balanced service: " + service);
        }
    }
}

分布式缓存机制

分布式缓存机制是分布式系统中常见的模式,用于存储和检索频繁访问的数据,以提高系统的性能和响应速度。常见的分布式缓存系统有Redis、Memcached等。

通过Redis实现分布式缓存示例

以下是一个简单的Redis分布式缓存示例,展示了如何使用Redis客户端库Jedis来实现分布式缓存。

创建Redis客户端示例:

import redis.clients.jedis.*;

public class SimpleRedisCache {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost");

        // 设置缓存值
        jedis.set("key", "value");

        // 获取缓存值
        String value = jedis.get("key");
        System.out.println("Cached value: " + value);

        // 删除缓存值
        jedis.del("key");

        jedis.close();
    }
}

使用Memcached实现分布式缓存示例

以下是一个简单的Memcached分布式缓存示例,展示了如何使用Memcached客户端库spymemcached来实现分布式缓存。

创建Memcached客户端示例:

import net.spy.memcached.*;

public class SimpleMemcachedCache {
    private static final String MEMCACHED_SERVER = "localhost";
    private static final int MEMCACHED_PORT = 11211;

    public static void main(String[] args) {
        MemcachedClient memcachedClient = new MemcachedClient(new InetSocketAddress(MEMCACHED_SERVER, MEMCACHED_PORT));

        // 设置缓存值
        memcachedClient.set("key", 0, "value");

        // 获取缓存值
        String value = memcachedClient.get("key").toString();
        System.out.println("Cached value: " + value);

        // 删除缓存值
        memcachedClient.delete("key");

        memcachedClient.shutdown();
    }
}
实战案例:实现一个简单的分布式应用

设计一个分布式应用

设计一个简单的分布式应用需要考虑以下几个方面:服务的定义、服务的部署和调用、服务的发现与注册、负载均衡与路由。

服务定义

服务定义是指定义服务接口和实现。以下是一个简单的服务接口和实现示例。

服务接口:

public interface SimpleService {
    String sayHello(String name);
}

服务实现:

public class SimpleServiceImpl implements SimpleService {
    @Override
    public String sayHello(String name) {
        return "Hello, " + name + "!";
    }
}

服务部署和调用

服务部署是指将服务部署到多个节点上,服务调用是指客户端如何调用服务。以下是一个简单的服务部署和调用示例。

服务注册:

import org.apache.zookeeper.*;
import org.apache.zookeeper.client.ZooKeeperClient;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class SimpleServiceRegistry {
    private static final String ZK_ADDRESS = "localhost:2181";
    private static final String SERVICE_PATH = "/services";
    private static final CountDownLatch latch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
        ZooKeeper zk = new ZooKeeper(ZK_ADDRESS, 3000, event -> {
            if (event.getType() == Watcher.Event.KeeperState.SyncConnected) {
                latch.countDown();
            }
        });

        latch.await();

        String serviceId = zk.create(SERVICE_PATH, "service1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

        System.out.println("Service registered: " + serviceId);

        zk.close();
    }
}

服务发现与调用:

import org.apache.zookeeper.*;
import org.apache.zookeeper.client.ZooKeeperClient;

import java.util.List;
import java.util.concurrent.CountDownLatch;

public class SimpleServiceDiscovery {
    private static final String ZK_ADDRESS = "localhost:2181";
    private static final String SERVICE_PATH = "/services";
    private static final CountDownLatch latch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
        ZooKeeper zk = new ZooKeeper(ZK_ADDRESS, 3000, event -> {
            if (event.getType() == Watcher.Event.KeeperState.SyncConnected) {
                latch.countDown();
            }
        });

        latch.await();

        List<String> serviceIds = zk.getChildren(SERVICE_PATH, true);

        for (String serviceId : serviceIds) {
            byte[] data = zk.getData(SERVICE_PATH + "/" + serviceId, false, new Stat());
            String serviceAddress = new String(data);
            SimpleService service = new SimpleServiceStub(serviceAddress);
            String result = service.sayHello("world");
            System.out.println(result);
        }

        zk.close();
    }

    public static class SimpleServiceStub implements SimpleService {
        private final String serviceAddress;

        public SimpleServiceStub(String serviceAddress) {
            this.serviceAddress = serviceAddress;
        }

        @Override
        public String sayHello(String name) {
            // 实现服务调用逻辑
            return "Hello, " + name + "!";
        }
    }
}

使用Spring Boot搭建服务

Spring Boot是一个基于Spring框架的快速开发工具,可以轻松地创建独立的、生产级别的Spring应用。以下是一个简单的Spring Boot服务示例,展示了如何使用Spring Boot搭建服务。

创建Spring Boot服务:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@SpringBootApplication
@RestController
public class SimpleSpringBootService {
    public static void main(String[] args) {
        SpringApplication.run(SimpleSpringBootService.class, args);
    }

    @GetMapping("/hello")
    public String sayHello(@RequestParam String name) {
        return "Hello, " + name + "!";
    }
}

创建Spring Boot客户端:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.client.RestTemplate;

@SpringBootApplication
public class SimpleSpringBootClient {
    public static void main(String[] args) {
        SpringApplication.run(SimpleSpringBootClient.class, args);

        RestTemplate restTemplate = new RestTemplate();
        String result = restTemplate.getForObject("http://localhost:8080/hello?name=world", String.class);
        System.out.println(result);
    }
}

服务的部署与测试

服务部署是指将服务部署到多个节点上。以下是一个简单的服务部署示例,展示了如何使用Docker和Kubernetes部署服务。

创建Docker镜像:

FROM openjdk:11-jre-slim
COPY target/simple-spring-boot-service.jar /app/simple-spring-boot-service.jar
ENTRYPOINT ["java", "-jar", "/app/simple-spring-boot-service.jar"]

构建Docker镜像:

docker build -t simple-spring-boot-service .

运行Docker容器:

docker run -d -p 8080:8080 --name simple-spring-boot-service simple-spring-boot-service

使用Kubernetes部署服务:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: simple-spring-boot-service
spec:
  replicas: 3
  selector:
    matchLabels:
      app: simple-spring-boot-service
  template:
    metadata:
      labels:
        app: simple-spring-boot-service
    spec:
      containers:
      - name: simple-spring-boot-service
        image: simple-spring-boot-service:latest
        ports:
        - containerPort: 8080
---
apiVersion: v1
kind: Service
metadata:
  name: simple-spring-boot-service
spec:
  selector:
    app: simple-spring-boot-service
  ports:
  - name: http
    port: 8080
    targetPort: 8080
  type: LoadBalancer

部署Kubernetes资源:

kubectl apply -f simple-spring-boot-service-deployment.yaml

测试服务:

kubectl get services
kubectl get pods
curl http://<service-ip>:8080/hello?name=world
分布式数据存储

基础数据存储技术介绍

分布式数据存储技术有多种实现,例如关系型数据库、NoSQL数据库和分布式文件系统。以下我们将介绍几种常见的分布式数据存储技术。

关系型数据库

关系型数据库是一种基于表格的数据存储系统,支持事务处理和ACID特性。常见的关系型数据库有MySQL、PostgreSQL、Oracle等。

NoSQL数据库

NoSQL数据库是一种非关系型的数据存储系统,支持高并发和高可用性。常见的NoSQL数据库有MongoDB、Cassandra、Redis等。

分布式文件系统

分布式文件系统是一种跨多个节点的文件存储系统,支持高可用性和高可靠性。常见的分布式文件系统有HDFS、GlusterFS等。

使用Redis实现分布式缓存

Redis是一个开源的内存数据存储系统,支持多种数据结构和丰富的操作。以下是一个简单的Redis分布式缓存示例,展示了如何使用Redis客户端库Jedis来实现分布式缓存。

创建Redis客户端示例:

import redis.clients.jedis.*;

public class SimpleRedisCache {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost");

        // 设置缓存值
        jedis.set("key", "value");

        // 获取缓存值
        String value = jedis.get("key");
        System.out.println("Cached value: " + value);

        // 删除缓存值
        jedis.del("key");

        jedis.close();
    }
}

使用MySQL与分布式数据库

MySQL是一种常见的关系型数据库,支持ACID事务和多种存储引擎。以下是一个简单的MySQL分布式数据库示例,展示了如何使用Java连接MySQL数据库并执行SQL查询。

创建数据库连接示例:

import java.sql.*;

public class SimpleMySqlDatabase {
    public static void main(String[] args) throws SQLException {
        Connection conn = null;
        Statement stmt = null;

        try {
            String url = "jdbc:mysql://localhost:3306/mydb";
            String user = "root";
            String password = "password";

            conn = DriverManager.getConnection(url, user, password);
            stmt = conn.createStatement();

            String sql = "SELECT * FROM users";
            ResultSet rs = stmt.executeQuery(sql);

            while (rs.next()) {
                System.out.println(rs.getString("name"));
            }
        } finally {
            if (stmt != null) stmt.close();
            if (conn != null) conn.close();
        }
    }
}
分布式协调服务

ZooKeeper简介

ZooKeeper是一种开源的分布式协调服务,提供配置管理、命名和分布式同步等功能。ZooKeeper使用类似于文件系统的数据模型,支持多种操作,如创建、删除和更新节点。

ZooKeeper数据模型

ZooKeeper的数据模型类似于文件系统,支持多级目录结构和节点属性。每个节点可以包含数据和子节点,支持多种操作,如创建、删除和更新节点。

ZooKeeper主要特性

  • 一致性:ZooKeeper保证所有客户端能够看到一致的数据视图。
  • 原子性:ZooKeeper保证所有操作要么全部成功,要么全部失败。
  • 可靠性:ZooKeeper保证所有操作能够可靠地执行。
  • 持久性:ZooKeeper支持持久节点,节点数据能够持久化存储。
  • 临时性:ZooKeeper支持临时节点,节点数据在会话结束时自动删除。

使用ZooKeeper实现服务发现

以下是一个简单的服务发现示例,展示了如何使用ZooKeeper实现服务发现。

创建服务注册中心(ZooKeeper):

docker run -d --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 zookeeper

服务注册示例:

import org.apache.zookeeper.*;
import org.apache.zookeeper.client.ZooKeeperClient;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class SimpleServiceRegistry {
    private static final String ZK_ADDRESS = "localhost:2181";
    private static final String SERVICE_PATH = "/services";
    private static final CountDownLatch latch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
        ZooKeeper zk = new ZooKeeper(ZK_ADDRESS, 3000, event -> {
            if (event.getType() == Watcher.Event.KeeperState.SyncConnected) {
                latch.countDown();
            }
        });

        latch.await();

        String serviceId = zk.create(SERVICE_PATH, "service1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

        System.out.println("Service registered: " + serviceId);

        zk.close();
    }
}

服务发现示例:

import org.apache.zookeeper.*;
import org.apache.zookeeper.client.ZooKeeperClient;

import java.util.List;
import java.util.concurrent.CountDownLatch;

public class SimpleServiceDiscovery {
    private static final String ZK_ADDRESS = "localhost:2181";
    private static final String SERVICE_PATH = "/services";
    private static final CountDownLatch latch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
        ZooKeeper zk = new ZooKeeper(ZK_ADDRESS, 3000, event -> {
            if (event.getType() == Watcher.Event.KeeperState.SyncConnected) {
                latch.countDown();
            }
        });

        latch.await();

        List<String> serviceIds = zk.getChildren(SERVICE_PATH, true);

        for (String serviceId : serviceIds) {
            byte[] data = zk.getData(SERVICE_PATH + "/" + serviceId, false, new Stat());
            String serviceAddress = new String(data);
            SimpleService service = new SimpleServiceStub(serviceAddress);
            String result = service.sayHello("world");
            System.out.println(result);
        }

        zk.close();
    }

    public static class SimpleServiceStub implements SimpleService {
        private final String serviceAddress;

        public SimpleServiceStub(String serviceAddress) {
            this.serviceAddress = serviceAddress;
        }

        @Override
        public String sayHello(String name) {
            // 实现服务调用逻辑
            return "Hello, " + name + "!";
        }
    }
}

使用ZooKeeper实现分布式锁

以下是一个简单的分布式锁示例,展示了如何使用ZooKeeper实现分布式锁。

创建分布式锁示例:

import org.apache.zookeeper.*;
import org.apache.zookeeper.client.ZooKeeperClient;

import java.util.concurrent.CountDownLatch;

public class SimpleDistributedLock {
    private static final String ZK_ADDRESS = "localhost:2181";
    private static final String LOCK_PATH = "/lock";

    public class DistributedLock implements Watcher {
        private ZooKeeper zk;
        private String lockId;
        private CountDownLatch latched = new CountDownLatch(1);

        public DistributedLock(String lockId) throws IOException, InterruptedException {
            zk = new ZooKeeper(ZK_ADDRESS, 3000, this);
            this.lockId = lockId;
            zk.create(LOCK_PATH, lockId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            latched.await();
        }

        @Override
        public void process(WatchedEvent event) {
            if (event.getType() == Watcher.Event.EventType.NodeCreated) {
                latched.countDown();
            }
        }

        public void lock() throws InterruptedException {
            while (true) {
                List<String> lockIds = zk.getChildren(LOCK_PATH, false);
                if (lockIds.contains(lockId)) {
                    return;
                }
                zk.create(LOCK_PATH + "/" + lockId, lockId.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                synchronized (lockIds) {
                    if (lockIds.contains(lockId)) {
                        return;
                    }
                }
            }
        }

        public void unlock() {
            zk.delete(LOCK_PATH + "/" + lockId, -1);
        }

        public void close() throws InterruptedException {
            zk.close();
        }

        public static void main(String[] args) throws IOException, InterruptedException {
            DistributedLock lock1 = new DistributedLock("lock1");
            DistributedLock lock2 = new DistributedLock("lock2");

            lock1.lock();
            System.out.println("Lock1 acquired");

            lock2.lock();
            System.out.println("Lock2 acquired");

            lock1.unlock();
            System.out.println("Lock1 released");

            lock2.unlock();
            System.out.println("Lock2 released");
        }
    }
}

以上是Java分布式学习入门指南的详细内容,涵盖了Java分布式系统的各个方面,包括基础概念、开发基础、架构模式、实战案例、数据存储和协调服务。希望对你有所帮助。

0人推荐
随时随地看视频
慕课网APP