Skip to content

Commit cd6c4d9

Browse files
committed
bytebuffer map serializer
1 parent f737e78 commit cd6c4d9

File tree

11 files changed

+313
-20
lines changed

11 files changed

+313
-20
lines changed

common/src/main/java/com/robin/core/fileaccess/iterator/AbstractFileIterator.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import com.robin.comm.dal.pool.ResourceAccessHolder;
1919
import com.robin.comm.sql.CommRecordGenerator;
2020
import com.robin.comm.sql.CommSqlParser;
21-
import com.robin.comm.sql.CompareNode;
2221
import com.robin.comm.sql.SqlSegment;
2322
import com.robin.core.base.exception.MissingConfigException;
2423
import com.robin.core.base.util.Const;

common/src/main/java/com/robin/core/fileaccess/util/ByteBufferInputStream.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,7 @@ public int read(byte[] b, int off, int len) throws IOException {
4545
public int read(byte[] b) throws IOException {
4646
return read(b, 0, b.length);
4747
}
48+
public int capacity(){
49+
return count;
50+
}
4851
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package com.robin.core.fileaccess.util;
2+
3+
import com.robin.core.base.exception.OperationNotSupportException;
4+
import com.robin.core.fileaccess.meta.DataSetColumnMeta;
5+
import org.springframework.util.Assert;
6+
import org.springframework.util.ObjectUtils;
7+
8+
import java.io.IOException;
9+
import java.nio.ByteBuffer;
10+
import java.nio.charset.Charset;
11+
import java.util.Iterator;
12+
import java.util.List;
13+
import java.util.Map;
14+
import java.util.function.BiConsumer;
15+
16+
public class ByteBufferMapReader {
17+
18+
public static List<Map<String,Object>> unSerialize(ByteBuffer byteBuffer,List<DataSetColumnMeta> colmeta,int recordlength){
19+
return null;
20+
}
21+
public Map<String,Object> unSerialize(ByteBufferInputStream inputStream,List<DataSetColumnMeta> columns,int recordlength,int pos){
22+
int startPos=recordlength*pos;
23+
Assert.isTrue(inputStream.capacity()>startPos,"");
24+
try {
25+
byte[] bytes=new byte[recordlength];
26+
inputStream.read(bytes,startPos,recordlength);
27+
28+
}catch (IOException ex){
29+
30+
}
31+
return null;
32+
}
33+
public static int serialize(ByteBuffer byteBuffer, List<DataSetColumnMeta> columns, Iterator<Map<String,Object>> iterator, int recordlength, int maxlength
34+
, BiConsumer<Map<String,Object>,Integer> consumer){
35+
36+
int recordpos=0;
37+
int bytepos=0;
38+
try(ByteBufferOutputStream outputStream=new ByteBufferOutputStream(byteBuffer)) {
39+
while(iterator.hasNext()){
40+
if(bytepos>=maxlength){
41+
throw new OperationNotSupportException("serialize overflow max capacity "+maxlength);
42+
}
43+
Map<String,Object> valueMap=iterator.next();
44+
serialize(outputStream,columns,valueMap,recordlength,maxlength);
45+
recordpos++;
46+
bytepos+=recordlength;
47+
consumer.accept(valueMap,recordpos);
48+
}
49+
50+
}catch (IOException ex){
51+
52+
}
53+
return bytepos;
54+
}
55+
public static void serialize(ByteBufferOutputStream outputStream, List<DataSetColumnMeta> columns, Map<String,Object> valueMap, int recordlength, int maxlength) throws IOException{
56+
int curpos=0;
57+
for(DataSetColumnMeta meta:columns){
58+
if(!ObjectUtils.isEmpty(valueMap.get(meta.getColumnName()))){
59+
curpos=outputStream.writePrimitive(valueMap.get(meta.getColumnName()),curpos, Charset.forName("utf8"));
60+
}else{
61+
throw new OperationNotSupportException(" column "+meta.getColumnName()+" is null,can not process");
62+
}
63+
}
64+
if(curpos<recordlength){
65+
outputStream.writeLeft(recordlength-curpos,0);
66+
}else {
67+
throw new OperationNotSupportException("record length over record limit "+recordlength);
68+
}
69+
}
70+
}

common/src/main/java/com/robin/core/fileaccess/util/ByteBufferOutputStream.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
package com.robin.core.fileaccess.util;
22

3+
import lombok.NonNull;
4+
import org.springframework.util.Assert;
5+
36
import java.io.IOException;
47
import java.io.OutputStream;
58
import java.nio.ByteBuffer;
9+
import java.nio.LongBuffer;
10+
import java.nio.charset.Charset;
611

712
public class ByteBufferOutputStream extends OutputStream {
813
private ByteBuffer byteBuffer;
@@ -32,4 +37,61 @@ public ByteBuffer getByteBuffer() {
3237
public int getCount() {
3338
return count;
3439
}
40+
public void writeLong(long value) throws IOException{
41+
for(byte bt:GenericByteConvertor.longToBytesLittle(value)){
42+
write(bt);
43+
}
44+
}
45+
public void writeGap() throws IOException{
46+
write(0);
47+
}
48+
public void writeLeft(int length,int input) throws IOException{
49+
for(int i=0;i<length;i++){
50+
write(input);
51+
}
52+
}
53+
public void writeInt(int value) throws IOException{
54+
for(byte bt:GenericByteConvertor.intToByteLittle(value)){
55+
write(bt);
56+
}
57+
}
58+
public void writeShort(short value) throws IOException{
59+
for(byte bt:GenericByteConvertor.shortToByteLittle(value)){
60+
write(bt);
61+
}
62+
}
63+
public void writeDouble(double value) throws IOException{
64+
for(byte bt:GenericByteConvertor.double2Bytes(value)){
65+
write(bt);
66+
}
67+
}
68+
public void writeBytes(byte[] bytes) throws IOException{
69+
for(byte bt:bytes){
70+
write(bt);
71+
}
72+
}
73+
public int writePrimitive(@NonNull Object obj,int startPos, Charset charset) throws IOException{
74+
Assert.notNull(obj,"");
75+
int nextPos=startPos;
76+
if(Integer.class.isAssignableFrom(obj.getClass())){
77+
writeInt((Integer)obj);
78+
nextPos+=5;
79+
}else if(Short.class.isAssignableFrom(obj.getClass())){
80+
writeShort((Short)obj);
81+
nextPos+=3;
82+
}else if(Long.class.isAssignableFrom(obj.getClass())){
83+
writeLong((Long)obj);
84+
nextPos+=9;
85+
}else if(String.class.isAssignableFrom(obj.getClass())){
86+
byte[] bytes=obj.toString().getBytes(charset);
87+
nextPos+=bytes.length+1;
88+
writeBytes(bytes);
89+
}else{
90+
throw new IOException("no primitive type support");
91+
}
92+
writeGap();
93+
return nextPos;
94+
}
95+
96+
3597
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package com.robin.core.fileaccess.util;
2+
3+
public class GenericByteConvertor {
4+
public static byte[] intToByteLittle(int n) {
5+
byte[] b = new byte[4];
6+
b[0] = (byte) (n & 0xff);
7+
b[1] = (byte) (n >> 8 & 0xff);
8+
b[2] = (byte) (n >> 16 & 0xff);
9+
b[3] = (byte) (n >> 24 & 0xff);
10+
return b;
11+
}
12+
public static int bytes2IntLittle(byte[] bytes )
13+
{
14+
int int1=bytes[0]&0xff;
15+
int int2=(bytes[1]&0xff)<<8;
16+
int int3=(bytes[2]&0xff)<<16;
17+
int int4=(bytes[3]&0xff)<<24;
18+
19+
return int1|int2|int3|int4;
20+
}
21+
public static byte[] longToBytesLittle(long n) {
22+
byte[] b = new byte[8];
23+
b[0] = (byte) (n & 0xff);
24+
b[1] = (byte) (n >> 8 & 0xff);
25+
b[2] = (byte) (n >> 16 & 0xff);
26+
b[3] = (byte) (n >> 24 & 0xff);
27+
b[4] = (byte) (n >> 32 & 0xff);
28+
b[5] = (byte) (n >> 40 & 0xff);
29+
b[6] = (byte) (n >> 48 & 0xff);
30+
b[7] = (byte) (n >> 56 & 0xff);
31+
return b;
32+
}
33+
public static long bytesToLongLittle( byte[] array )
34+
{
35+
return ((((long) array[ 0] & 0xff) << 0)
36+
| (((long) array[ 1] & 0xff) << 8)
37+
| (((long) array[ 2] & 0xff) << 16)
38+
| (((long) array[ 3] & 0xff) << 24)
39+
| (((long) array[ 4] & 0xff) << 32)
40+
| (((long) array[ 5] & 0xff) << 40)
41+
| (((long) array[ 6] & 0xff) << 48)
42+
| (((long) array[ 7] & 0xff) << 56));
43+
}
44+
public static byte[] shortToByteLittle(short n) {
45+
byte[] b = new byte[2];
46+
b[0] = (byte) (n & 0xff);
47+
b[1] = (byte) (n >> 8 & 0xff);
48+
return b;
49+
}
50+
public static short byteToShortLittle(byte[] b) {
51+
return (short) (((b[1] << 8) | b[0] & 0xff));
52+
}
53+
public static byte[] double2Bytes(double d) {
54+
long value = Double.doubleToRawLongBits(d);
55+
byte[] byteRet = new byte[8];
56+
for (int i = 0; i < 8; i++) {
57+
byteRet[i] = (byte) ((value >> 8 * i) & 0xff);
58+
}
59+
return byteRet;
60+
}
61+
public static double bytes2Double(byte[] arr) {
62+
long value = 0;
63+
for (int i = 0; i < 8; i++) {
64+
value |= ((long) (arr[i] & 0xff)) << (8 * i);
65+
}
66+
return Double.longBitsToDouble(value);
67+
}
68+
}

etl/src/main/java/com/robin/etl/context/StatefulJobContext.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ public class StatefulJobContext {
1313
private Long sourceId;
1414
private DataCollectionMeta inputMeta;
1515
private DataCollectionMeta outputMeta;
16+
private DataCollectionMeta workingMeta;
1617
private Long outputSourceId;
1718
private Map<String,Object> jobParam;
1819

etl/src/main/java/com/robin/etl/meta/model/EtlFlowCfg.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ public class EtlFlowCfg extends BaseObject {
4141
private String cronTrigger;
4242
@MappingField
4343
private Integer priority;
44+
@MappingField
45+
private String stepDefinition;
4446

4547

4648
}

etl/src/main/java/com/robin/etl/meta/service/EtlFlowCfgService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,6 @@ public Map<String, Object> getCfgConfig(Long flowId) {
7878
} catch (Exception ex) {
7979

8080
}
81-
return null;
81+
return retMap;
8282
}
8383
}

etl/src/main/java/com/robin/etl/util/AbstractProcessCycleGen.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ public abstract class AbstractProcessCycleGen {
1212
protected static DateTimeFormatter dayFormatter = DateTimeFormatter.ofPattern("yyyyMMdd");
1313
protected static DateTimeFormatter hourFormatter = DateTimeFormatter.ofPattern("yyyyMMddHH");
1414

15-
abstract Pair<Boolean,String> genRunCycleByTypeAndStartTime(EtlConstant.CYCLE_TYPE runCycle, LocalDateTime dateTime);
16-
abstract Pair<String,LocalDateTime> getNextRunningCycle(EtlConstant.CYCLE_TYPE cycleType,LocalDateTime dateTime);
17-
abstract void finishCycle(String runCycle);
18-
abstract LocalDateTime parseTimeByType(String cycle,Integer cycleType);
15+
public abstract Pair<Boolean,String> genRunCycleByTypeAndStartTime(Integer runCycle, LocalDateTime dateTime);
16+
public abstract Pair<String,LocalDateTime> getNextRunningCycle(Integer cycleType,LocalDateTime dateTime);
17+
public abstract void finishCycle(String runCycle);
18+
public abstract LocalDateTime parseTimeByType(String cycle,Integer cycleType);
1919
}

etl/src/main/java/com/robin/etl/util/CommProcessCycleGen.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public static CommProcessCycleGen getInstance() {
2424
}
2525

2626
@Override
27-
public Pair<Boolean, String> genRunCycleByTypeAndStartTime(@NonNull EtlConstant.CYCLE_TYPE cycleType,@NonNull LocalDateTime dateTime) {
27+
public Pair<Boolean, String> genRunCycleByTypeAndStartTime(@NonNull Integer cycleType,@NonNull LocalDateTime dateTime) {
2828
Assert.notNull(dateTime,"");
2929
boolean runTag = false;
3030
StringBuilder cycle = new StringBuilder();
@@ -38,11 +38,11 @@ public Pair<Boolean, String> genRunCycleByTypeAndStartTime(@NonNull EtlConstant.
3838
int hour = dateTime.getHour();
3939
int minutes = dateTime.getMinute();
4040

41-
if (cycleType.equals(EtlConstant.CYCLE_TYPE.YEAR)) {
41+
if (EtlConstant.CYCLE_TYPE.YEAR.getInt().equals(cycleType)) {
4242
//年任务,读取指定的执行日期,执行周期为上一个年度
4343
cycle.append(year - 1);
4444
runTag = true;
45-
} else if (cycleType.equals(EtlConstant.CYCLE_TYPE.QUARTER)) {
45+
} else if (EtlConstant.CYCLE_TYPE.QUARTER.getInt().equals(cycleType)) {
4646
//季度,下一个季度第一个月开始判断
4747
int caculateYear = year;
4848
if (month % 3 == 1) {
@@ -54,7 +54,7 @@ public Pair<Boolean, String> genRunCycleByTypeAndStartTime(@NonNull EtlConstant.
5454
cycle.append(caculateYear).append("Q").append(quarter);
5555
}
5656
runTag = true;
57-
} else if (cycleType.equals(EtlConstant.CYCLE_TYPE.MONTH)) {
57+
} else if (EtlConstant.CYCLE_TYPE.MONTH.getInt().equals(cycleType)) {
5858
//每月的某一天
5959
LocalDateTime tmpTime = dateTime;
6060
//前一月
@@ -63,7 +63,7 @@ public Pair<Boolean, String> genRunCycleByTypeAndStartTime(@NonNull EtlConstant.
6363
if (null == runCycle || !runCycle.equals(cycle)) {
6464
runTag = true;
6565
}
66-
} else if (cycleType.equals(EtlConstant.CYCLE_TYPE.XUN)) {
66+
} else if (EtlConstant.CYCLE_TYPE.XUN.getInt().equals(cycleType)) {
6767
LocalDateTime tmpTime = dateTime;
6868
//前一月
6969
tmpTime = tmpTime.minusMonths(1);
@@ -75,19 +75,19 @@ public Pair<Boolean, String> genRunCycleByTypeAndStartTime(@NonNull EtlConstant.
7575
xunNum++;
7676
}
7777
cycle.append("X"+xunNum);
78-
} else if (cycleType.equals(EtlConstant.CYCLE_TYPE.WEEK)) {
78+
} else if (EtlConstant.CYCLE_TYPE.WEEK.getInt().equals(cycleType)) {
7979
//上一周范围
8080
LocalDateTime tmpDatetime = dateTime.minusWeeks(1);
8181
DayOfWeek dayOfWeek = tmpDatetime.getDayOfWeek();
8282
int value = dayOfWeek.getValue();
8383
cycle.append(year+"W"+value);
8484
runTag=true;
8585

86-
} else if (cycleType.equals(EtlConstant.CYCLE_TYPE.DAY.getInt())) {
86+
} else if (EtlConstant.CYCLE_TYPE.DAY.getInt().equals(cycleType)) {
8787
LocalDateTime tmpTs=dateTime.minusDays(1);
8888
cycle.append(dayFormatter.format(tmpTs));
8989
runTag=true;
90-
}else if(cycleType.equals(EtlConstant.CYCLE_TYPE.HOUR)){
90+
}else if(EtlConstant.CYCLE_TYPE.HOUR.getInt().equals(cycleType)){
9191
LocalDateTime tmpTs=dateTime.minusHours(1);
9292
cycle.append(hourFormatter.format(tmpTs));
9393
runTag=true;
@@ -96,30 +96,30 @@ public Pair<Boolean, String> genRunCycleByTypeAndStartTime(@NonNull EtlConstant.
9696
}
9797

9898
@Override
99-
public Pair<String,LocalDateTime> getNextRunningCycle(EtlConstant.CYCLE_TYPE cycleType, LocalDateTime dateTime) {
99+
public Pair<String,LocalDateTime> getNextRunningCycle(Integer cycleType, LocalDateTime dateTime) {
100100
Assert.notNull(dateTime,"");
101101
String cycle=null;
102102
LocalDateTime tmpTs=null;
103-
if (EtlConstant.CYCLE_TYPE.YEAR.equals(cycleType)) {
103+
if (EtlConstant.CYCLE_TYPE.YEAR.getInt().equals(cycleType)) {
104104
tmpTs= dateTime.plusYears(1);
105105
cycle=yearFormatter.format(tmpTs);
106-
}else if(EtlConstant.CYCLE_TYPE.QUARTER.equals(cycleType)){
106+
}else if(EtlConstant.CYCLE_TYPE.QUARTER.getInt().equals(cycleType)){
107107
tmpTs=dateTime.plusMonths(3);
108108
int month = tmpTs.getMonthValue();
109109
int quarter = month / 3+1;
110110
cycle=yearFormatter.format(tmpTs)+"Q"+quarter;
111-
}else if(EtlConstant.CYCLE_TYPE.MONTH.equals(cycleType)){
111+
}else if(EtlConstant.CYCLE_TYPE.MONTH.getInt().equals(cycleType)){
112112
tmpTs=dateTime.minusMonths(1);
113113
cycle=monthFormatter.format(tmpTs);
114114
}
115-
else if(EtlConstant.CYCLE_TYPE.XUN.equals(cycleType)){
115+
else if(EtlConstant.CYCLE_TYPE.XUN.getInt().equals(cycleType)){
116116
tmpTs=dateTime.plusDays(10);
117117
int xun=tmpTs.getDayOfMonth()/10+1;
118118
if(xun==4){
119119
xun=3;
120120
}
121121
cycle=monthFormatter.format(tmpTs)+"X"+xun;
122-
}else if(EtlConstant.CYCLE_TYPE.DAY.equals(cycleType)){
122+
}else if(EtlConstant.CYCLE_TYPE.DAY.getInt().equals(cycleType)){
123123
tmpTs=dateTime.plusDays(1);
124124
cycle=dayFormatter.format(tmpTs);
125125
}

0 commit comments

Comments
 (0)