SpringBoot整合ElasticSearch

SpringBoot整合ElasticSearch

leo 3325 2021-09-08

环境

ElasticSearch 。

如果使用 docker-compose 部署 ElasticSearch 集群,可以参考我的这篇文章 docker-compose部署ElasticSearch集群

依赖

pom.xml 依赖文件如下:springboot 版本是 2.4.0.RELEASE 。

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <scope>test</scope>
    </dependency>
    <!-- elasticsearch starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
        <version>2.4.3</version>
    </dependency>

    <!--lombok-->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.47</version>
    </dependency>

</dependencies>

配置

我这里采用 Java 配置和配置文件相结合的方式进行配置,创建配置类如下:

/**
 * es配置类
 *
 * @author Leo
 * @create 2021/3/2 10:51
 **/
@Configuration
public class ElasticsearchConfig {

    @Value("${elasticsearch.schema}")
    private String schema;
    @Value("${elasticsearch.address}")
    private String address;
    @Value("${elasticsearch.connectTimeout}")
    private int connectTimeout;
    @Value("${elasticsearch.socketTimeout}")
    private int socketTimeout;
    @Value("${elasticsearch.connectionRequestTimeout}")
    private int tryConnTimeout;
    @Value("${elasticsearch.maxConnectNum}")
    private int maxConnNum;
    @Value("${elasticsearch.maxConnectPerRoute}")
    private int maxConnectPerRoute;
    @Value("${elasticsearch.userName}")
    private String userName;
    @Value("${elasticsearch.password}")
    private String password;

    /**
     * Es Rest客户端配置
     * @return
     */
    @Bean
    public RestHighLevelClient restHighLevelClient() {
        //认证信息
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password));

        //解析地址
        List<HttpHost> hostLists = new ArrayList<>();
        String[] hostList = address.split(",");
        for (String addr : hostList) {
            String host = addr.split(":")[0];
            String port = addr.split(":")[1];
            hostLists.add(new HttpHost(host, Integer.parseInt(port), schema));
        }

        HttpHost[] httpHost = hostLists.toArray(new HttpHost[]{});
        RestClientBuilder builder = RestClient.builder(httpHost);
        //连接超时配置
        builder.setRequestConfigCallback(requestConfigBuilder -> {
            requestConfigBuilder.setConnectTimeout(connectTimeout);
            requestConfigBuilder.setSocketTimeout(socketTimeout);
            requestConfigBuilder.setConnectionRequestTimeout(tryConnTimeout);
            return requestConfigBuilder;
        });
        //连接数、认证信息配置
        builder.setHttpClientConfigCallback(httpClientBuilder -> {
            httpClientBuilder.setMaxConnTotal(maxConnNum);
            httpClientBuilder.setMaxConnPerRoute(maxConnectPerRoute);
            httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
            return httpClientBuilder;
        });
        return new RestHighLevelClient(builder);
    }
}

配置文件application.yml内容如下:

spring:
  application:
    name: hello-elasticsearch

elasticsearch:
  schema: http
  address: localhost:9200
  connectTimeout: 5000
  socketTimeout: 5000
  connectionRequestTimeout: 1000
  maxConnectNum: 50
  maxConnectPerRoute: 10
  userName:
  password:

测试

创建一个实体类

该类对应我们存储进 ES 的数据结构。这里用 Book 做示例

/**
 * @author Leo
 * @create 2021/6/9 14:58
 **/
@Data
@Accessors(chain = true)
@Document(indexName = "books")
public class Book {
    @Id
    private String id;

    @Field(type = FieldType.Text)
    private String name;

    @Field(type = FieldType.Text)
    private String summary;

    @Field(type = FieldType.Integer)
    private Integer price;

    @Field(type = FieldType.Date, format = DateFormat.date)
    private LocalDate publishDate;
}

我们可以通过注解绑定对象和 ES 中的 index ,以及定义字段的类型等。

创建DAO类

该类和我们经常定义的访问数据库的类一样,用于定义访问 ES 的接口。

/**
 * @author Leo
 * @create 2021/6/9 15:02
 **/
@Repository
public interface BookRepository extends ElasticsearchRepository<Book, String> {
    /**
     * 计算特定name数量
     * @param name
     * @return
     */
    long countByName(String name);

    /**
     * 通过name查询 也可以用 List 接收
     * @param name
     * @return
     */
    Iterable<Book> findByName(String name);

    /**
     * 通过name查询,带分页
     * @param name
     * @param pageable
     * @return
     */
    Page<Book> findByName(String name, Pageable pageable);

    /**
     * 模糊查询(忽略大小写)
     * @param name
     * @return
     */
    List<Book> findByNameLike(String name);

    /**
     * 查询价格低于特定值的记录
     * @param price
     * @return
     */
    Stream<Book> findByPriceLessThan(Integer price);

    /**
     * 异步查询
     * 返回类型可以是:Future/CompletableFuture/ListenableFuture
     * @param name
     * @return
     */
    @Async
    CompletableFuture<Book> findOneByName(String name);
}

我们不需要实现该接口,就可以操作 ElasticSearch,因为如果我们按照一定的规则定义方法名,参数,返回值类型等,Spring Data 整合 ElasticSearch 模块会自动解析方法含义(可以看到我们的 BookRepository 继承了 ElasticsearchRepository),底层通过 RestHighLevelClient 访问 ES 。完整规则:Spring官方文档

创建测试类,测试对应方法

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class HelloElasticsearchApplicationTests {

    @Resource
    private BookRepository bookRepository;

    /**
     * 新增
     */
    @Test
    public void testSave() {
        bookRepository.save(new Book()
//                .setId("1")
                .setName("jvm GC原理")
                .setPrice(78)
                .setPublishDate(LocalDate.of(2015, 7, 23))
                .setSummary("详细讲解Java 虚拟机垃圾回收机制及其原理"));
    }

    /**
     * 批量新增
     */
    @Test
    public void testSaveAll() {
        List<Book> books = new ArrayList<>();
        for(int i = 0; i < 20; i++) {
            books.add(new Book()
                    .setId(String.valueOf(i))
                    .setName("Book" + i)
                    .setPrice(new Random().nextInt(200))
                    .setPublishDate(LocalDate.now())
                    .setSummary(String.valueOf(i)));
        }
        bookRepository.saveAll(books);

    }

    /**
     * 查询单条,查询所有,查询记录数量
     */
    @Test
    public void testFind() {
//        bookRepository.findById("1").ifPresent(book -> log.info(book.toString()));
        log.info("count of books : " + bookRepository.count());
//        bookRepository.findById("2").ifPresent(book -> log.info(book.toString()));
        bookRepository.findAll().forEach(book -> log.info(book.toString()));
    }

    /**
     * 测试分页和排序
     */
    @Test
    public void testPage() {
        bookRepository.findAll(PageRequest.of(0, 10, Sort.by("price").descending()))
                .forEach(book -> log.info(book.toString()));
    }

    /**
     * 删除
     */
    @Test
    public void testDelete() {
        bookRepository.deleteById("0");
//        bookRepository.deleteAll();
    }

    /**
     * 指定条件计数
     */
    @Test
    public void testCount() {
        String name = "深入理解JVM";
        log.info("count of book 《" + name + "》 : " + bookRepository.countByName(name));
    }

    @Test
    public void testFindByName() {
        String name = "深入理解JVM";
        bookRepository.findByName(name).forEach(book -> log.info(book.toString()));
    }

    @Test
    public void testFindByNamePageable() {
        String name = "深入理解JVM";
        bookRepository.findByName(name, PageRequest.of(0, 1)).forEach(book -> log.info(book.toString()));
    }

    /**
     * 模糊查询
     */
    @Test
    public void testFindByNameLike() {
        String name = "JVM";
        bookRepository.findByNameLike(name).forEach(book -> log.info(book.toString()));
    }

    /**
     * 比较查询
     */
    @Test
    public void testFindByPriceLessThan() {
        int price = 100;
        bookRepository.findByPriceLessThan(price).forEach(book -> log.info(book.toString()));
    }

    /**
     * 异步执行
     */
    @Test
    public void testAsync() {
        String name = "深入理解JVM";
        CompletableFuture<Book> future = bookRepository.findOneByName(name);
        future.whenComplete((book, throwable) -> {
            if (throwable != null) {
                log.error("error! reason : ", throwable);
            } else {
                log.info("Async query get Result: " + book.toString());
            }
        });
    }
}

如果安装了 Kibana 的话,我们可以进入 Kibana 后台 http://localhost:5601/app/home#/,通过 Kibana Dev Tools 中的 Console 界面来方便查询数据。

kibana-dev-tools

或者通过 Kibana Discover 界面以列表的形式查看所有数据。

kibana-discover