-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
149 lines (127 loc) · 4.94 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
/**
*
* Link e outros:
* https://v8.dev/features/top-level-await
* https://www.kaggle.com/datasets/stackoverflow/so-survey-2017?select=survey_results_public.csv
* https://www.kaggle.com/datasets/stackoverflow/stack-overflow-2018-developer-survey?select=survey_results_public.csv
* https://www.w3schools.com/nodejs/met_path_dirname.asp
* https://developer.mozilla.org/pt-BR/docs/Web/JavaScript/Reference/Operators/import.meta
* https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/filter
* https://httptoolkit.com/blog/unblocking-node-with-unref/
* https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/JSON/parse
* https://www.w3schools.com/js/js_json_parse.asp
* https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/JSON/stringify
* https://www.w3schools.com/js/js_json_stringify.asp
* https://www.npmjs.com/package/csvtojson
*
*
* Comandos usados:
* npm start (node --harmony-top-level-await index.js)
* npm start (DEBUG=app* --harmony-top-level-await index.js)
* npm start | tee log.log
*
* Outros:
* console.time("concat-data");
* console.timeEnd("concat-data");
*
* Com os consoles acima conseguimos ver quanto tempo nosso código
* demora para rodar tudo o que pedimos, no caso se reparar no código abaixo
* encontrará o console.time("concat-data"); mais acima, várias linhas de código
* e lá no final o console.timeEnd("concat-data"); ou seja, o tempo que levar para rodar
* o código que está no meio desses comandos vai ser exibido no final
*
*/
// console.log(await Promise.resolve(true));
import { dirname, join } from "node:path";
import { promisify } from "node:util";
import { promises, createReadStream, createWriteStream } from "node:fs";
import { pipeline, Transform } from "node:stream";
import debug from "debug";
import csvtojson from "csvtojson";
import jsontocsv from "json-to-csv-stream";
import StreamConcat from "stream-concat";
const { readdir } = promises;
const log = debug("app:concat");
const pipelineAsync = promisify(pipeline);
/**
*
* PRIMEIRA ETAPA
* Aqui garantimos os caminhos para os nossos arquivos
* Vemos onde esta rodando nosso index
* E localizamos nossa pasta com os arquivos csv etc
*
*/
// pega o caminho da onde esta rodando o arquivo
const { pathname: currentFile } = new URL(import.meta.url);
// console.log(currentFile);
// pega qual o diretório que o arquivo esta rodando
const cwd = dirname(currentFile);
// console.log(cwd);
// mostra o caminho para a pasta onde estão os arquivos csv e outros
const filesDir = `${cwd}/dataset`;
// console.log(filesDir);
// mostra onde será a saída dos dados
const output = `${cwd}/final.csv`;
// console.log(output);
console.time("concat-data");
// lemos o diretório em que estão os arquivos com dados e pegamos só os .csv
const files = (await readdir(filesDir))
.filter((item) => !!!~item.indexOf(".zip"));
log(`Processing: ${files}`);
/**
*
* SEGUNDA ETAPA:
* Aqui fazemos uma pequena barra de progresso para ter algum sinal
* para o user que o programa esta rodando e não está travado
*
*/
// barra de progresso
const ONE_SECOND = 1000;
// quando os outros processos acabarem ele morre junto (unref)
setInterval(() => process.stdout.write("."), ONE_SECOND).unref();
/**
*
* TERCEIRA ETAPA:
* Aqui começamos a trabalhar propriamente com as streams
* Lendo nossos arquivos ( createReadStream, streams, combinedStreams ) e juntando as streams
* dos arquivos que lemos em uma só ( StreamConcat )
* Transformando nossos dados de cvs para json ( csvtojson() )
* Manipulando os dados e pegando apenas os dados que queremos (Transform / handleStream)
* Transformando os dados de json para csv
* Por fim, enviando esses dados para alguma saída (createWriteStream / Writable stream)
*
* Essas etapas são definidas no nosso pipeline (pipelineAsync).
*
* Chunk = nossa dado / um pedaço do arquivo que foi lido
* cb = callback / o que estamos retornando (1º param: error | 2º param: nosso dado)
*
*/
// lendo os arquivos (readable stream)
const streams = files.map(
item => createReadStream(join(filesDir, item))
)
const combinedStreams = new StreamConcat(streams);
// trabalhando com os dados dos arquivos - pegando apenas os dados que queremos - mudando tipo dos dados
const handleStream = new Transform({
transform: (chunk, enconding, cb) => {
const data = JSON.parse(chunk);
const output = {
id: data.Respondent,
country: data.Country
}
// log(`Id: ${output.id}`);
return cb(null, JSON.stringify(output));
}
})
// escrevendo os dados (writable stream)
const finalStream = createWriteStream(output);
// etapas de execução da stream / pipeline
await pipelineAsync(
combinedStreams,
csvtojson(),
handleStream,
jsontocsv(),
finalStream
);
log(`${files.length} files merged! on ${output}`);
console.timeEnd("concat-data");