标签搜索

目 录CONTENT

文章目录

手写框架之Dubbo(RPC)

陈铭
2021-07-11 / 0 评论 / 0 点赞 / 161 阅读 / 1,562 字 / 正在检测是否收录...

执行流程

Dubbo服务端

该框架的Dubbo服务端主要维护好本地注册的服务和远程注册的服务URL,并且暴露给消费端接口进行调用。为了实现服务端的活动,提供基本的运行环境,服务端是在内嵌Tomcat环境下运行的。

Dubbo消费端

该框架的Dubbo消费端主要根据接口进行RPC调用,其调用的本质是根据接口产生代理对象(JDK动态代理),来获取远程注册的URL去发现Dubbo服务的本地注册信息,进而发起请求,在服务端根据反射进行方法的执行并返回。

执行流程图

如图所示
image

注册模块

远程注册

远程注册,本质上就是维护一个map,key是接口名,value是接口对应方法的URL(其中定义了ip地址、端口号、servlet路径)。

    public static void register(String serviceName,URL serviceUrl){
        if (map.containsKey(serviceName)){
            List<URL> urls = map.get(serviceName);
            urls.add(serviceUrl);
            map.put(serviceName,urls);
        }else {
            List<URL> urls = new ArrayList<>();
            urls.add(serviceUrl);
            map.put(serviceName,urls);
        }

        //因为两个进程读取不到同一个map,这里取巧存成文件
        saveFile();
    }

    private static void saveFile() {
        FileOutputStream fileOutputStream = null;
        ObjectOutputStream objectOutputStream = null;
        try {
            fileOutputStream = new FileOutputStream("./tempDubbo.txt");
            objectOutputStream = new ObjectOutputStream(fileOutputStream);
            objectOutputStream.writeObject(map);

            objectOutputStream.flush();

        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            if (fileOutputStream!=null && objectOutputStream!=null){
                try {
                    fileOutputStream.close();
                    objectOutputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static URL getInterfaceUrl(String serviceName){
        getFile();

        if (map.containsKey(serviceName)){
            List<URL> urls = map.get(serviceName);
            //随机分配一个url
            return urls.get(new Random().nextInt(urls.size()));
        }

        return null;
    }

    private static void getFile() {
        FileInputStream fileInputStream = null;
        ObjectInputStream objectInputStream = null;
        try {
            fileInputStream = new FileInputStream("./tempDubbo.txt");
            objectInputStream = new ObjectInputStream(fileInputStream);
            map= (Map<String, List<URL>>) objectInputStream.readObject();

        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            if (fileInputStream!=null && objectInputStream!=null){
                try {
                    fileInputStream.close();
                    objectInputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

    }

其中,由于远程注册并不是模拟真正的注册中心,所以服务端和消费端访问的map并不一致,代码中取了巧,把map存在了txt文件中,方便测试。

本地注册

本地注册,本质上也是维护一个map,key是接口名,value是接口实现类的class。

    private static Map<String,Class> map=new HashMap<>();
    public static void register(String interfaceName,Class interfaceImplClass){
        map.put(interfaceName,interfaceImplClass);
    }
    public static Class getInterface(String interfaceName){
        return map.get(interfaceName);
    }

接口调用

代理对象生成

为了模拟实际Dubbo进行接口代理对象的注入,这里创建了Reference的工厂类,生成对应接口的代理对象。

    public static <T> T getReference(Class interfaceClass) {
        HttpClient httpClient = new HttpClient();

        //模拟从注册中心获取dubbo服务的url

        URL dubboUrl = RemoteRegister.getInterfaceUrl(interfaceClass.getName());

        if (dubboUrl != null) {
            return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvocationHandler() {
                @Override
                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

                    //发起post请求
                    Invocation invocation = new Invocation(DubboProviderService.class.getName(), method.getName(), method.getParameterTypes(), args);
                    String result = httpClient.post(dubboUrl.getHostName(), dubboUrl.getPort(), dubboUrl.getDubboPath(), invocation);

                    return result;
                }
            });
        }else {
            return null;
        }
    }

上述代码本质上就是获取对应接口的URL(从远程注册处获取),根据URL发起http请求。

远程注册URL拉取

远程注册URL的拉取做了简单模拟,就是从map中取value。

调用请求及其方法执行

先放代码。

    public static <T> T getReference(Class interfaceClass) {
        HttpClient httpClient = new HttpClient();

        //模拟从注册中心获取dubbo服务的url

        URL dubboUrl = RemoteRegister.getInterfaceUrl(interfaceClass.getName());

        if (dubboUrl != null) {
            return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, new InvocationHandler() {
                @Override
                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

                    //发起post请求
                    Invocation invocation = new Invocation(DubboProviderService.class.getName(), method.getName(), method.getParameterTypes(), args);
                    String result = httpClient.post(dubboUrl.getHostName(), dubboUrl.getPort(), dubboUrl.getDubboPath(), invocation);

                    return result;
                }
            });
        }else {
            return null;
        }
    }

根据接口内的对应方法,用Invocation对象封装接口名,方法名,参数类,实际参数,进而用HttpClient对象发起post请求,让服务端反射执行方法。

HttpServer和HttpClient

HttpServer

该对象模拟Dubbo服务端的运行情况,根据内嵌的Tomcat生成web环境,来接受消费端RPC调用的http请求。该类就定义了一个start方法

    public void start(String port, String hostName){
        Tomcat tomcat = new Tomcat();

        Server server = tomcat.getServer();
        Service service = server.findService("Tomcat");

        Connector connector = new Connector();
        connector.setPort(Integer.parseInt(port));

        Engine engine = new StandardEngine();
        engine.setDefaultHost(hostName);


        Host host = new StandardHost();
        host.setName(hostName);


        String contextPath="";
        Context context = new StandardContext();
        context.setPath(contextPath);
        context.addLifecycleListener(new Tomcat.FixContextListener());

        //维护起层级关系
        host.addChild(context);
        engine.addChild(host);
        service.setContainer(engine);
        service.addConnector(connector);

        tomcat.addServlet(contextPath,"dispatcher",new DispatcherServlet());
        context.addServletMappingDecoded("/DubboProviderService/get","dispatcher");

        try {
            tomcat.start();
            tomcat.getServer().await();
        } catch (LifecycleException e) {
            e.printStackTrace();
        }


    }

start方法中使用了自定义的servlet(DispatcherServlet类),主要接收Dubbo的RPC调用的请求,并执行对应方法。DispatcherServlet对象接收到请求会执行HttpServerHandler对象的handler方法。这个方法就是具体Dubbo调用的业务逻辑。

public class DispatcherServlet extends HttpServlet {
    @Override
    protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        new HttpServerHandler().handler(request,response);
    }
}



public class HttpServerHandler  {
    public void handler(HttpServletRequest request, HttpServletResponse response){
        InputStream inputStream=null;
        ObjectInputStream objectInputStream=null;
        try {
            inputStream = request.getInputStream();
            objectInputStream = new ObjectInputStream(inputStream);
            Invocation invocation = (Invocation) objectInputStream.readObject();

            if (invocation!=null) {
                String interfaceName = invocation.getInterfaceName();
                String methodName = invocation.getMethodName();
                Class[] parametersClass = invocation.getParametersClass();
                Object[] parameters = invocation.getParameters();

                //获取接口实现类
                Class anInterface = LocalRegister.getInterface(interfaceName);
                if (anInterface!=null){
                    Method method = anInterface.getMethod(methodName, parametersClass);
                    if (method!=null){
                        String result = (String) method.invoke(anInterface.newInstance(), parameters);
                        ServletOutputStream outputStream = response.getOutputStream();
                        IOUtils.write(result, outputStream);
                    }
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        }


    }
}

handler方法里面主要是接收消费端发来的Invocation对象,根据反射创建接口实现类的bean,在执行对应方法。最后将执行结果返回给response,让消费端接收。

HttpClient

给对象模拟了发送http请求的情况,对象内就一个post方法,方法里面是用HttpURLConnection发起http请求,并在请求中封装对应的Invocation对象。

public class HttpClient {

    public String post(String hostName , String port, String hostPath, Invocation invocation){
        String result="";
        InputStream inputStream=null;
        OutputStream outputStream=null;
        ObjectOutputStream objectOutputStream=null;

        try {
            URL url = new URL("http", hostName,Integer.parseInt(port), hostPath);
            HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
            httpURLConnection.setRequestMethod("POST");
            httpURLConnection.setDoOutput(true);

            outputStream = httpURLConnection.getOutputStream();
            objectOutputStream = new ObjectOutputStream(outputStream);
            objectOutputStream.writeObject(invocation);

            objectOutputStream.flush();

            inputStream = httpURLConnection.getInputStream();
            result = IOUtils.toString(inputStream);

            System.out.println("执行结果:"+result);

        } catch (MalformedURLException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            if (outputStream!=null && objectOutputStream!=null && inputStream!=null) {
                try {
                    inputStream.close();
                    outputStream.close();
                    objectOutputStream.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

            return result;
        }
    }
}

Gitee源码

源码

0

评论区