@@ -5,7 +5,13 @@ import { importColumnTypes } from '@xata.io/importer';
55import { open , writeFile } from 'fs/promises' ;
66import { BaseCommand } from '../../base.js' ;
77import { enumFlag } from '../../utils/oclif.js' ;
8- import { getBranchDetailsWithPgRoll } from '../../migrations/pgroll.js' ;
8+ import {
9+ getBranchDetailsWithPgRoll ,
10+ waitForMigrationToFinish ,
11+ xataColumnTypeToPgRollComment
12+ } from '../../migrations/pgroll.js' ;
13+ import { compareSchemas } from '../../utils/compareSchema.js' ;
14+ import keyBy from 'lodash.keyby' ;
915
1016const ERROR_CONSOLE_LOG_LIMIT = 200 ;
1117const ERROR_LOG_FILE = 'errors.log' ;
@@ -23,6 +29,8 @@ const bufferEncodings: BufferEncoding[] = [
2329 'hex'
2430] ;
2531
32+ const INTERNAL_COLUMNS_PGROLL = [ 'xata_id' , 'xata_createdat' , 'xata_updatedat' , 'xata_version' ] ;
33+
2634export default class ImportCSV extends BaseCommand < typeof ImportCSV > {
2735 static description = 'Import a CSV file' ;
2836
@@ -144,12 +152,26 @@ export default class ImportCSV extends BaseCommand<typeof ImportCSV> {
144152 if ( ! parseResults . success ) {
145153 throw new Error ( 'Failed to parse CSV file' ) ;
146154 }
147- const batchRows = parseResults . data . map ( ( { data } ) => data ) ;
155+ const batchRows = parseResults . data . map ( ( { data } ) => {
156+ const formattedRow : { [ k : string ] : any } = { } ;
157+ const keys = Object . keys ( data ) ;
158+ for ( const key of keys ) {
159+ if ( INTERNAL_COLUMNS_PGROLL . includes ( key ) && key !== 'xata_id' ) continue ;
160+ formattedRow [ key ] = data [ key ] ;
161+ }
162+ return formattedRow ;
163+ } ) ;
164+
148165 const importResult = await xata . import . importBatch (
149166 { workspace, region, database, branch } ,
150- { columns : parseResults . columns , table, batchRows }
167+ {
168+ columns : parseResults . columns . filter (
169+ ( { name } ) => name === 'xata_id' || ! INTERNAL_COLUMNS_PGROLL . includes ( name )
170+ ) ,
171+ table,
172+ batchRows
173+ }
151174 ) ;
152-
153175 await xata . import . importFiles (
154176 { database, branch, region, workspace : workspace } ,
155177 {
@@ -212,22 +234,39 @@ export default class ImportCSV extends BaseCommand<typeof ImportCSV> {
212234 const xata = await this . getXataClient ( ) ;
213235 const { workspace, region, database, branch } = await this . parseDatabase ( ) ;
214236 const { schema : existingSchema } = await getBranchDetailsWithPgRoll ( xata , { workspace, region, database, branch } ) ;
215- const newSchema = {
216- tables : [
217- ...existingSchema . tables . filter ( ( t ) => t . name !== table ) ,
218- { name : table , columns : columns . filter ( ( c ) => c . name !== 'id' ) }
219- ]
220- } ;
221237
222- const { edits } = await xata . api . migrations . compareBranchWithUserSchema ( {
223- pathParams : { workspace, region, dbBranchName : `${ database } :main` } ,
224- body : { schema : newSchema }
225- } ) ;
226- if ( edits . operations . length > 0 ) {
227- const destructiveOperations = edits . operations
238+ const { edits } = compareSchemas (
239+ { } ,
240+ {
241+ tables : {
242+ [ table ] : {
243+ name : table ,
244+ xataCompatible : false ,
245+ columns : keyBy (
246+ columns
247+ . filter ( ( c ) => ! INTERNAL_COLUMNS_PGROLL . includes ( c . name as any ) )
248+ . map ( ( c ) => {
249+ return {
250+ name : c . name ,
251+ type : c . type ,
252+ nullable : c . notNull !== false ,
253+ default : c . defaultValue ?? null ,
254+ unique : c . unique ,
255+ comment : xataColumnTypeToPgRollComment ( c )
256+ } ;
257+ } ) ,
258+ 'name'
259+ )
260+ }
261+ }
262+ }
263+ ) ;
264+
265+ if ( edits . length > 0 ) {
266+ const destructiveOperations = edits
228267 . map ( ( op ) => {
229- if ( ! ( 'removeColumn ' in op ) ) return undefined ;
230- return op . removeColumn . column ;
268+ if ( ! ( 'drop_column ' in op ) ) return undefined ;
269+ return op . drop_column . column ;
231270 } )
232271 . filter ( ( x ) => x !== undefined ) ;
233272
@@ -262,10 +301,14 @@ export default class ImportCSV extends BaseCommand<typeof ImportCSV> {
262301 process . exit ( 1 ) ;
263302 }
264303
265- await xata . api . migrations . applyBranchSchemaEdit ( {
304+ const { jobID } = await xata . api . migrations . applyMigration ( {
266305 pathParams : { workspace, region, dbBranchName : `${ database } :${ branch } ` } ,
267- body : { edits }
306+ body : {
307+ adaptTables : true ,
308+ operations : edits
309+ }
268310 } ) ;
311+ await waitForMigrationToFinish ( xata . api , workspace , region , database , branch , jobID ) ;
269312 }
270313 }
271314}
0 commit comments