环境与版本 Elasticsearch版本 服务端:elasticsearch 7.5.1 单节点
客户端:elasticsearch 6.8.5
Logstash获取MySQL数据的配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 input { jdbc { add_field => { "myid" =>"jdbc" } jdbc_connection_string => "<mysql 地址>" jdbc_user => "xxxxxxxxxxxxx" jdbc_password => "xxxxxxxxxxxx" ##数据库驱动的JAR位置 jdbc_driver_library => "/usr/share/logstash/config/mysql-connector-java-5.1.47.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" statement => "select id,username,realname,age,birth from tb_user" ## 每分钟执行一次 schedule => "* * * * *" } } output { if [ myid] == "jdbc" { elasticsearch { ##elasticsearch 地址 hosts => "<elasticsearch 地址>:9200" ##索引名称 index => "index-user" document_id => "%{id}" ##索引类型 document_type => "user" } stdout { codec => json_lines } } }
Spring 版本 Spring boot:2.2.2.RELEASE
spring-boot-data-elasticsearch:3.2 对应elasticsearch的版本为6.8.5
完整pom文件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 <?xml version="1.0" encoding="UTF-8" ?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <parent > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-parent</artifactId > <version > 2.2.2.RELEASE</version > <relativePath /> </parent > <groupId > com.monochrome</groupId > <artifactId > elasticsearch-test</artifactId > <version > 0.0.1-SNAPSHOT</version > <name > elasticsearch-test</name > <description > Demo project for ElasticSearch</description > <properties > <java.version > 1.8</java.version > <spring-boot-admin.version > 2.2.1</spring-boot-admin.version > <spring-cloud.version > Hoxton.SR1</spring-cloud.version > </properties > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-actuator</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-data-jpa</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > de.codecentric</groupId > <artifactId > spring-boot-admin-starter-client</artifactId > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-starter-netflix-eureka-client</artifactId > </dependency > <dependency > <groupId > mysql</groupId > <artifactId > mysql-connector-java</artifactId > <version > 5.1.47</version > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > <optional > true</optional > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-data-elasticsearch</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > <scope > test</scope > <exclusions > <exclusion > <groupId > org.junit.vintage</groupId > <artifactId > junit-vintage-engine</artifactId > </exclusion > </exclusions > </dependency > <dependency > <groupId > io.springfox</groupId > <artifactId > springfox-swagger2</artifactId > <version > 2.9.2</version > </dependency > <dependency > <groupId > io.springfox</groupId > <artifactId > springfox-swagger-ui</artifactId > <version > 2.9.2</version > </dependency > </dependencies > <dependencyManagement > <dependencies > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-dependencies</artifactId > <version > ${spring-cloud.version}</version > <type > pom</type > <scope > import</scope > </dependency > <dependency > <groupId > de.codecentric</groupId > <artifactId > spring-boot-admin-dependencies</artifactId > <version > ${spring-boot-admin.version}</version > <type > pom</type > <scope > import</scope > </dependency > </dependencies > </dependencyManagement > <build > <plugins > <plugin > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-maven-plugin</artifactId > </plugin > </plugins > </build > </project >
PS:项目用到了Spring Cloud,如果不需要可以去除相关依赖。
Spring Boot配置文件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 spring: application: name: elasticsearch-test datasource: driver-class-name: com.mysql.jdbc.Driver url: <Mysql address> username: xxxxxxxxxx password: xxxxxxxxxx data: elasticsearch: cluster-name: docker-cluster cluster-nodes: <elasticsearch 集群地址> elasticsearch: rest: uris: ["<elasticsearch 地址>" ] ipAddrs: - <elasticsearch 地址> eureka: client: service-url: defaultZone: <Eureka地址> instance: instance-id: elasticsearch-test prefer-ip-address: true server: port: 8091 info: app: name: elasticsearch-test environment: test version: 1.0 .0
项目搭建 实体类 定义一个User
实体类以及EsUser
实体类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Data @Entity(name = "tb_user") public class User implements Serializable { @Id @GeneratedValue(strategy= GenerationType.IDENTITY) private Long id; private String username; private String realname; private Integer age; private Date birth; } @Data @Document(indexName = "index-user", refreshInterval = "0s") public class EsUser implements Serializable { @Id private Long id; private String username; private String realname; private Integer age; private Date birth; }
配置Elasticsearch Rest High Level Client Elasticsearch(ES)有两种连接方式:transport、rest。transport通过TCP方式访问ES(只支持java),rest方式通过http API 访问ES(没有语言限制)。 ES官方建议使用rest方式, transport 在7.0版本中不建议使用,在8.X的版本中废弃。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 @EqualsAndHashCode(callSuper = true) @Data @AllArgsConstructor @NoArgsConstructor public class ElasticsearchRestClientConfig extends AbstractElasticsearchConfiguration { private static final int ES_IP_ADDRESSES_LENGTH = 2 ; private static final String HTTP_SCHEME = "http" ; @Value("${spring.elasticsearch.rest.ipAddrs}") private List<String> ipAddresses = new ArrayList <>(); @Bean public RestClientBuilder restClientBuilder () { HttpHost[] hosts = ipAddresses.stream() .map(this ::makeHttpHost) .filter(Objects::nonNull) .toArray(HttpHost[]::new ); return RestClient.builder(hosts); } private HttpHost makeHttpHost (String s) { assert StringUtils.isNotEmpty(s); String[] address = s.split(":" ); if (address.length == ES_IP_ADDRESSES_LENGTH) { String ip = address[0 ]; int port = Integer.parseInt(address[1 ]); return new HttpHost (ip, port, HTTP_SCHEME); } return null ; } @Override public RestHighLevelClient elasticsearchClient () { HttpHost[] hosts = ipAddresses.stream() .map(this ::makeHttpHost) .filter(Objects::nonNull) .toArray(HttpHost[]::new ); return new RestHighLevelClient (RestClient.builder(hosts)); } }
使用Spring data Jpa Repository操作数据库 1 2 3 4 @Repository public interface UserRepository extends JpaRepository <User, Long> {}
使用Spring Data Elasticsearch Repositories操作Elasticsearch 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 public interface EsUserRepository extends ElasticsearchRepository <EsUser, Long> { List<EsUser> findByUsername (String username) ; List<EsUser> findByUsernameAndRealname (String username, String realname) ; List<EsUser> findByUsernameOrRealname (String username, String realname) ; List<EsUser> findByUsernameNot (String username) ; List<EsUser> findByAgeBetween (Integer ageFrom, Integer ageTo) ; List<EsUser> findByBirthLessThan (LocalDateTime birthTo) ; List<EsUser> findByBirthGreaterThan (LocalDateTime birthFrom) ; List<EsUser> findByAgeBefore (Integer ageTo) ; List<EsUser> findByAgeAfter (Integer ageFrom) ; List<EsUser> findByUsernameLike (String username) ; List<EsUser> findByUsernameStartingWith (String start) ; List<EsUser> findByUsernameEndingWith (String end) ; List<EsUser> findByUsernameContaining (String word) ; List<EsUser> findByUsernameIn (Collection<String> usernames) ; List<EsUser> findByUsernameNotIn (Collection<String> usernames) ; List<EsUser> findByAgeBeforeAndUsernameStartingWithAndIdGreaterThanOrderByAgeDesc (Integer ageTo, String start, Long idTo) ; }
控制层 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 @RestController @RequestMapping("/user") @Slf4j public class UserController { @Autowired private EsUserRepository esUserRepository; @Autowired private UserRepository userRepository; @PostMapping public void saveUser (@RequestBody User user) { userRepository.save(user); } @DeleteMapping("/{id:\\d+}") public void removeUser (@PathVariable("id") Long id) { userRepository.deleteById(id); } @PostMapping("/list") public void saveUsers (@RequestBody List<User> users) { userRepository.saveAll(users); } @GetMapping("/username/{username}") public List<EsUser> getUserByUsername (@PathVariable String username) { return esUserRepository.findByUsername(username); } @GetMapping("/usernameLike/{username}") public List<EsUser> getUserByUsernameLike (@PathVariable String username) { log.info("get User By Username Like {}" ,username); return esUserRepository.findByUsernameLike(username); } @GetMapping("/ageTo/{ageTo}/name_start/{nameStart}/id/{id:\\d+}") public List<EsUser> getUserByAgeAndUsernameAndId (@PathVariable Integer ageTo, @PathVariable String nameStart, @PathVariable Long id) { return esUserRepository.findByAgeBeforeAndUsernameStartingWithAndIdGreaterThanOrderByAgeDesc(ageTo, nameStart, id); } }
Swagger配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Configuration @EnableSwagger2 public class Swagger2 { @Bean public Docket createRestApi () { return new Docket (DocumentationType.SWAGGER_2) .apiInfo(apiInfo()) .select() .apis(RequestHandlerSelectors.basePackage("com.monochrome.elasticsearch" )) .paths(PathSelectors.any()) .build(); } private ApiInfo apiInfo () { return new ApiInfoBuilder () .title("Elasticsearch Demo RESTful APIs" ) .description("服务名:elasticsearch Demo" ) .version("1.0" ) .build(); }