Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fixed the issue of reading a large number of files OOM and upgraded the ES version #871

Merged
merged 4 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public void test1() {
RequestPayload payload = RequestPayload.builder().maxTokens(4000).anthropicVersion("vertex-2023-10-16").messages(Lists.newArrayList(Message.builder().role("user")
.content(content)
.build())).build();
ResponsePayload r = c.call(c.token(), payload);
ResponsePayload r = c.call(c.token(""), payload);
System.out.println(r.getContent().get(0).getText());
}
}
162 changes: 73 additions & 89 deletions jcommon/es/pom.xml
Original file line number Diff line number Diff line change
@@ -1,91 +1,75 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>run.mone</groupId>
<artifactId>jcommon</artifactId>
<version>1.6.0-jdk21-SNAPSHOT</version>
</parent>
<artifactId>es</artifactId>
<version>1.5-jdk21-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.10.0</version>
<exclusions>
<exclusion>
<artifactId>httpclient</artifactId>
<groupId>org.apache.httpcomponents</groupId>
</exclusion>
<exclusion>
<artifactId>httpcore</artifactId>
<groupId>org.apache.httpcomponents</groupId>
</exclusion>
<exclusion>
<artifactId>httpcore-nio</artifactId>
<groupId>org.apache.httpcomponents</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client-sniffer</artifactId>
<version>7.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.12</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore-nio</artifactId>
<version>4.4.13</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.13</version>
</dependency>
<dependency>
<groupId>run.mone</groupId>
<artifactId>nacos</artifactId>
<version>1.4-v1-jdk20-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>21</source>
<target>21</target>
<verbose>true</verbose>
<encoding>UTF-8</encoding>
<compilerArguments>
<sourcepath>${project.basedir}/src/main/java</sourcepath>
</compilerArguments>
</configuration>
</plugin>
<plugin>
<artifactId>maven-source-plugin</artifactId>
<version>2.1</version>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
<configuration>
<attach>true</attach>
</configuration>
</plugin>
</plugins>
</build>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>run.mone</groupId>
<artifactId>jcommon</artifactId>
<version>1.6.0-jdk21-SNAPSHOT</version>
</parent>
<artifactId>es</artifactId>
<version>1.7-jdk8-SNAPSHOT</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<es.version>7.17.21</es.version>
</properties>

<dependencies>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${es.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client-sniffer</artifactId>
<version>${es.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${es.version}</version>
</dependency>

<dependency>
<groupId>run.mone</groupId>
<artifactId>nacos</artifactId>
<version>1.4-v1-jdk20-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>21</source>
<target>21</target>
<verbose>true</verbose>
<encoding>UTF-8</encoding>
<compilerArguments>
<sourcepath>${project.basedir}/src/main/java</sourcepath>
</compilerArguments>
</configuration>
</plugin>
<plugin>
<artifactId>maven-source-plugin</artifactId>
<version>2.1</version>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
<configuration>
<attach>true</attach>
</configuration>
</plugin>
</plugins>
</build>
</project>
13 changes: 6 additions & 7 deletions jcommon/es/src/main/java/com/xiaomi/mone/es/EsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,14 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.*;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.client.indices.*;
import org.elasticsearch.client.sniff.ElasticsearchNodesSniffer;
import org.elasticsearch.client.sniff.SniffOnFailureListener;
import org.elasticsearch.client.sniff.Sniffer;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
Expand All @@ -47,6 +43,7 @@
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.histogram.LongBounds;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
import java.util.*;
Expand Down Expand Up @@ -186,7 +183,9 @@ private RestClientBuilder createRestClientBuilder(List<HttpHost> hosts, Header[]
}

private void initializeHighLevelClient(RestClientBuilder clientBuilder) {
this.client = new RestHighLevelClient(clientBuilder);
this.client = new RestHighLevelClientBuilder(clientBuilder.build())
.setApiCompatibilityMode(true)
.build();
this.restClient = client.getLowLevelClient();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.core.TimeValue;

import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.indices.GetMappingsResponse;
import org.elasticsearch.client.indices.IndexTemplatesExistRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,69 +1,79 @@
package com.xiaomi.mone.es.test;

import com.xiaomi.data.push.nacos.NacosConfig;
import com.xiaomi.mone.es.EsProcessor;
import com.xiaomi.mone.es.EsClient;
import com.xiaomi.mone.es.ProcessorConf;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.junit.Test;

import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

public class EsProcessorClientTest {

@Test
public void bulkInsert() throws InterruptedException {

NacosConfig config = new NacosConfig();
config.setDataId("zzy_new");
// config.init();

String ip = config.getConfig("es_ip");
String user = config.getConfig("es_user");
String pwd = config.getConfig("es_password");
ProcessorConf conf = new ProcessorConf(100, 5, 1, 100, 3, 5, new EsClient(ip, user, pwd), new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
System.out.println("before insert" + request);
}

@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
System.out.println("success after,request:" + request.getDescription() + " resopnse:" + Arrays.toString(response.getItems()));
}

@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
System.out.println("success after,request:" + request + " failure:" + failure);
}
});
EsProcessor processor = new EsProcessor(conf);
try {
String indexName = "zgq_common_milog_staging_free_private_1-" + new SimpleDateFormat("yyyy.MM.dd").format(new Date());
Map<String, Object> data = new HashMap<>();
data.put("timestamp", System.currentTimeMillis());
data.put("filename", "/home/work/log/log-manager/server.log1");
int n = 1;
int count = 0;
while (true) {
//package com.xiaomi.mone.es.test;
//
//import com.google.common.reflect.TypeToken;
//import com.google.gson.Gson;
//import com.xiaomi.data.push.nacos.NacosConfig;
//import com.xiaomi.mone.es.EsClient;
//import com.xiaomi.mone.es.EsProcessor;
//import com.xiaomi.mone.es.ProcessorConf;
//import org.elasticsearch.action.bulk.BulkProcessor;
//import org.elasticsearch.action.bulk.BulkRequest;
//import org.elasticsearch.action.bulk.BulkResponse;
//import org.junit.Test;
//
//import java.text.SimpleDateFormat;
//import java.time.Instant;
//import java.util.Arrays;
//import java.util.Date;
//import java.util.HashMap;
//import java.util.Map;
//
//public class EsProcessorClientTest {
//
// @Test
// public void bulkInsert() throws InterruptedException {
//
// String str = "{\"@timestamp\":\"2024-06-20T19:39:15.871+08:00\",\"@version\":\"1\",\"message\":\"hello world data test wtt~\",\"logger_name\":\"com.xiaomi.ai.Application\",\"thread_name\":\"http-nio-10010-exec-3\",\"level\":\"INFO\",\"level_value\":20000,\"LOG_NAME\":\"ai-workflow\",\"SENTRY_ENABLED\":\"false\",\"user_name\":\"wangjunfei3\",\"user_team\":\"ncl7150\",\"request_uri\":\"/hello\",\"trace_id\":\"9cf73bfe51e877a83806ac01b6630815\",\"trace_flags\":\"01\",\"span_id\":\"c13434f908acebb4\"}";
// Map<String, Object> data = new Gson().fromJson(str, new TypeToken<Map<String, Object>>() {
// }.getType());
// data.put("timeStamp", System.currentTimeMillis());
//
// NacosConfig config = new NacosConfig();
// config.setDataId("zzy_new");
//// config.init();
//
// String ip = "zjydw.api.es.srv:80";
// String user = config.getConfig("es_user");
// String pwd = config.getConfig("es_password");
// String token = "4244b7014a5c44fea63bea711c7697fe";
// String catalog = "es_zjy_log";
// String database = "default";
//
// EsClient esClient = new EsClient(ip, token, catalog, database);
// ProcessorConf conf = new ProcessorConf(100, 5, 1, 100, 3, 5, esClient, new BulkProcessor.Listener() {
// @Override
// public void beforeBulk(long executionId, BulkRequest request) {
// System.out.println("before insert" + request);
// }
//
// @Override
// public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
// System.out.println("success after,request:" + request.getDescription() + " resopnse:" + Arrays.toString(response.getItems()));
// }
//
// @Override
// public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
// System.out.println("success after,request:" + request + " failure:" + failure);
// }
// });
// EsProcessor processor = new EsProcessor(conf);
// try {
// String indexName = "prod_hera_index_95956-" + new SimpleDateFormat("yyyy.MM.dd").format(new Date());
// int n = 1;
// int count = 0;
// while (true) {
//// processor.bulkInsert(indexName, data);
// processor.bulkInsert(indexName, data);
processor.bulkInsert(indexName, data);
count++;
if (count == n) {
break;
}
}
// Thread.sleep(10000l);
System.in.read();
}catch (Exception e){
e.printStackTrace();
}

}
}
// count++;
// if (count == n) {
// break;
// }
// }
//// Thread.sleep(10000l);
// System.in.read();
// } catch (Exception e) {
// e.printStackTrace();
// }
//
// }
//}
2 changes: 2 additions & 0 deletions jcommon/file/src/main/java/com/xiaomi/mone/file/ILogFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
*/
public interface ILogFile {

int LINE_MAX_LENGTH = 1100000;

void readLine() throws IOException;

void setStop(boolean stop);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class LogFile implements ILogFile {

private String md5;

private static final int LINE_MAX_LENGTH = 50000;
// private static final int LINE_MAX_LENGTH = 50000;

public LogFile() {

Expand Down
Loading
Loading