embulk-input-jdbc 0.9.2 → 0.9.3
This diff represents the content of publicly available package versions that have been released to one of the supported registries. The information contained in this diff is provided for informational purposes only and reflects changes between package versions as they appear in their respective public registries.
- checksums.yaml +4 -4
- data/README.md +33 -0
- data/classpath/embulk-input-jdbc-0.9.3.jar +0 -0
- data/src/main/java/org/embulk/input/jdbc/AbstractJdbcInputPlugin.java +46 -5
- data/src/main/java/org/embulk/input/jdbc/JdbcInputConnection.java +114 -22
- metadata +3 -3
- data/classpath/embulk-input-jdbc-0.9.2.jar +0 -0
checksums.yaml
CHANGED
@@ -1,7 +1,7 @@
|
|
1
1
|
---
|
2
2
|
SHA1:
|
3
|
-
metadata.gz:
|
4
|
-
data.tar.gz:
|
3
|
+
metadata.gz: 89f25bb2c92757d6e3f6aad8cdd9721d58eb216c
|
4
|
+
data.tar.gz: f1195041a5488157685ba4f435b5e0e67276f107
|
5
5
|
SHA512:
|
6
|
-
metadata.gz:
|
7
|
-
data.tar.gz:
|
6
|
+
metadata.gz: 66aded100ed9af8b837e9b9d12cf0d43cda252a5f909b6d0d0bbea97143fd3165d02882c5e54aaa3c1f49ab2e4fd804fa7f4bb33c54a915aca282a0be63588ef
|
7
|
+
data.tar.gz: 95bb8bea452106e630740f6ea239a2556c8bee1c36be9fef6318f2be97efc53eefc8844b342b7aba7ea5e659941b63527528ee6392dba9becbab67c1497eafdb
|
data/README.md
CHANGED
@@ -22,6 +22,7 @@ Generic JDBC input plugin for Embulk loads records from a database using a JDBC
|
|
22
22
|
- **options**: extra JDBC properties (hash, default: {})
|
23
23
|
- If you write SQL directly,
|
24
24
|
- **query**: SQL to run (string)
|
25
|
+
- **use_raw_query_with_incremental**: If true, you can write optimized query using prepared statement. See [Use incremental loading with raw query](#use-incremental-loading-with-raw-query) for more detail (boolean, default: false)
|
25
26
|
- If **query** is not set,
|
26
27
|
- **table**: destination table name (string, required)
|
27
28
|
- **select**: expression of select (e.g. `id, created_at`) (string, default: "*")
|
@@ -76,6 +77,38 @@ CREATE INDEX embulk_incremental_loading_index ON table (updated_at, id);
|
|
76
77
|
|
77
78
|
Recommended usage is to leave `incremental_columns` unset and let this plugin automatically finds an auto-increment primary key. Currently, only strings and integers are supported as incremental_columns.
|
78
79
|
|
80
|
+
### Use incremental loading with raw query
|
81
|
+
|
82
|
+
**IMPORTANT**: This is an advanced feature and assume you have an enough knowledge about incremental loading using Embulk and this plugin
|
83
|
+
|
84
|
+
Normally, you can't write your own query for incremental loading.
|
85
|
+
`use_raw_query_with_incremental` option allow you to write raw query for incremental loading. It might be well optimized and faster than SQL statement which is automatically generated by plugin.
|
86
|
+
|
87
|
+
Prepared statement starts with `:` is available instead of fixed value.
|
88
|
+
`last_record` value is necessary when you use this option.
|
89
|
+
Please use prepared statement that is well distinguishable in SQL statement. Using too simple prepared statement like `:a` might cause SQL parse failure.
|
90
|
+
|
91
|
+
In the following example, prepared statement `:foo_id` will be replaced with value "1" which is specified in `last_record`.
|
92
|
+
|
93
|
+
```yaml
|
94
|
+
in:
|
95
|
+
type: jdbc
|
96
|
+
query:
|
97
|
+
SELECT
|
98
|
+
foo.id as foo_id, bar.name
|
99
|
+
FROM
|
100
|
+
foo LEFT JOIN bar ON foo.id = bar.id
|
101
|
+
WHERE
|
102
|
+
foo.hoge IS NOT NULL
|
103
|
+
AND foo.id > :foo_id
|
104
|
+
ORDER BY
|
105
|
+
foo.id ASC
|
106
|
+
use_raw_query_with_incremental: true
|
107
|
+
incremental_columns:
|
108
|
+
- foo_id
|
109
|
+
incremental: true
|
110
|
+
last_record: [1]
|
111
|
+
```
|
79
112
|
|
80
113
|
## Example
|
81
114
|
|
Binary file
|
@@ -6,12 +6,14 @@ import java.net.MalformedURLException;
|
|
6
6
|
import java.net.URISyntaxException;
|
7
7
|
import java.net.URL;
|
8
8
|
import java.nio.file.Path;
|
9
|
+
import java.util.Comparator;
|
9
10
|
import java.util.List;
|
10
11
|
import java.util.Map;
|
11
12
|
import java.util.Properties;
|
12
13
|
import java.nio.file.Paths;
|
13
14
|
import java.sql.ResultSet;
|
14
15
|
import java.sql.SQLException;
|
16
|
+
import java.util.TreeMap;
|
15
17
|
|
16
18
|
import org.slf4j.Logger;
|
17
19
|
|
@@ -67,6 +69,10 @@ public abstract class AbstractJdbcInputPlugin
|
|
67
69
|
@ConfigDefault("null")
|
68
70
|
public Optional<String> getQuery();
|
69
71
|
|
72
|
+
@Config("use_raw_query_with_incremental")
|
73
|
+
@ConfigDefault("false")
|
74
|
+
public boolean getUseRawQueryWithIncremental();
|
75
|
+
|
70
76
|
@Config("select")
|
71
77
|
@ConfigDefault("null")
|
72
78
|
public Optional<String> getSelect();
|
@@ -210,7 +216,29 @@ public abstract class AbstractJdbcInputPlugin
|
|
210
216
|
// build SELECT query and gets schema of its result
|
211
217
|
String rawQuery = getRawQuery(task, con);
|
212
218
|
|
213
|
-
JdbcSchema querySchema =
|
219
|
+
JdbcSchema querySchema = null;
|
220
|
+
if (task.getUseRawQueryWithIncremental()) {
|
221
|
+
String temporaryQuery = rawQuery;
|
222
|
+
|
223
|
+
// Insert pair of columnName:columnIndex order by column name length DESC
|
224
|
+
TreeMap<String, Integer> columnNames = new TreeMap<>(new Comparator<String>() {
|
225
|
+
@Override
|
226
|
+
public int compare(String val1, String val2) {
|
227
|
+
return val2.length() - val1.length();
|
228
|
+
}
|
229
|
+
});
|
230
|
+
for (int i = 0; i < task.getIncrementalColumns().size(); i++) {
|
231
|
+
columnNames.put(task.getIncrementalColumns().get(i), i);
|
232
|
+
}
|
233
|
+
|
234
|
+
for (Map.Entry<String, Integer> column : columnNames.entrySet()) {
|
235
|
+
// Temporary replace place holder like ":id" with "?" to avoid SyntaxException while getting schema.
|
236
|
+
temporaryQuery = temporaryQuery.replace(":" + column.getKey(), "?");
|
237
|
+
}
|
238
|
+
querySchema = con.getSchemaOfQuery(temporaryQuery);
|
239
|
+
} else {
|
240
|
+
querySchema = con.getSchemaOfQuery(rawQuery);
|
241
|
+
}
|
214
242
|
task.setQuerySchema(querySchema);
|
215
243
|
// query schema should not change after incremental query
|
216
244
|
|
@@ -251,13 +279,13 @@ public abstract class AbstractJdbcInputPlugin
|
|
251
279
|
}
|
252
280
|
|
253
281
|
if (task.getQuery().isPresent()) {
|
254
|
-
preparedQuery = con.wrapIncrementalQuery(rawQuery, querySchema,
|
282
|
+
preparedQuery = con.wrapIncrementalQuery(rawQuery, querySchema, incrementalColumns, lastRecord, task.getUseRawQueryWithIncremental());
|
255
283
|
}
|
256
284
|
else {
|
257
285
|
preparedQuery = con.rebuildIncrementalQuery(
|
258
286
|
task.getTable().get(), task.getSelect(),
|
259
287
|
task.getWhere(),
|
260
|
-
querySchema,
|
288
|
+
querySchema, incrementalColumns, lastRecord);
|
261
289
|
}
|
262
290
|
}
|
263
291
|
else {
|
@@ -330,8 +358,21 @@ public abstract class AbstractJdbcInputPlugin
|
|
330
358
|
if (task.getTable().isPresent() || task.getSelect().isPresent() ||
|
331
359
|
task.getWhere().isPresent() || task.getOrderBy().isPresent()) {
|
332
360
|
throw new ConfigException("'table', 'select', 'where' and 'order_by' parameters are unnecessary if 'query' parameter is set.");
|
333
|
-
} else if (
|
334
|
-
|
361
|
+
} else if (task.getUseRawQueryWithIncremental()) {
|
362
|
+
String rawQuery = task.getQuery().get();
|
363
|
+
for (String columnName : task.getIncrementalColumns()) {
|
364
|
+
if (!rawQuery.contains(":" + columnName)) {
|
365
|
+
throw new ConfigException(String.format("Column \":%s\" doesn't exist in query string", columnName));
|
366
|
+
}
|
367
|
+
}
|
368
|
+
if (!task.getLastRecord().isPresent()) {
|
369
|
+
throw new ConfigException("'last_record' is required when 'use_raw_query_with_incremental' is set to true");
|
370
|
+
}
|
371
|
+
if (task.getLastRecord().get().size() != task.getIncrementalColumns().size()) {
|
372
|
+
throw new ConfigException("size of 'last_record' is different from of 'incremental_columns'");
|
373
|
+
}
|
374
|
+
} else if (!task.getUseRawQueryWithIncremental() && (!task.getIncrementalColumns().isEmpty() || task.getLastRecord().isPresent())) {
|
375
|
+
throw new ConfigException("'incremental_columns' and 'last_record' parameters are not supported if 'query' parameter is set and 'use_raw_query_with_incremental' is set to false.");
|
335
376
|
}
|
336
377
|
return task.getQuery().get();
|
337
378
|
} else if (task.getTable().isPresent()) {
|
@@ -7,17 +7,19 @@ import java.sql.ResultSet;
|
|
7
7
|
import java.sql.ResultSetMetaData;
|
8
8
|
import java.sql.SQLException;
|
9
9
|
import java.sql.Statement;
|
10
|
+
import java.util.Comparator;
|
10
11
|
import java.util.Locale;
|
12
|
+
import java.util.Map;
|
13
|
+
import java.util.Map.Entry;
|
11
14
|
import java.util.Set;
|
12
15
|
|
13
|
-
import org.embulk.config.ConfigException;
|
14
16
|
import org.embulk.spi.Exec;
|
15
17
|
import org.embulk.input.jdbc.getter.ColumnGetter;
|
16
18
|
import org.slf4j.Logger;
|
17
19
|
|
18
20
|
import java.util.List;
|
19
21
|
import java.util.ArrayList;
|
20
|
-
import
|
22
|
+
import java.util.TreeMap;
|
21
23
|
|
22
24
|
import com.fasterxml.jackson.annotation.JsonCreator;
|
23
25
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
@@ -252,7 +254,7 @@ public class JdbcInputConnection
|
|
252
254
|
public PreparedQuery rebuildIncrementalQuery(String tableName,
|
253
255
|
Optional<String> selectExpression, Optional<String> whereCondition,
|
254
256
|
JdbcSchema querySchema,
|
255
|
-
List<
|
257
|
+
List<String> incrementalColumns, List<JsonNode> incrementalValues) throws SQLException
|
256
258
|
{
|
257
259
|
List<JdbcLiteral> parameters = ImmutableList.of();
|
258
260
|
|
@@ -268,7 +270,7 @@ public class JdbcInputConnection
|
|
268
270
|
|
269
271
|
sb.append("(");
|
270
272
|
parameters = buildIncrementalConditionTo(sb,
|
271
|
-
querySchema,
|
273
|
+
querySchema, incrementalColumns, incrementalValues);
|
272
274
|
sb.append(")");
|
273
275
|
|
274
276
|
newWhereCondition = Optional.of(sb.toString());
|
@@ -280,7 +282,7 @@ public class JdbcInputConnection
|
|
280
282
|
Optional<String> newOrderByExpression;
|
281
283
|
{
|
282
284
|
StringBuilder sb = new StringBuilder();
|
283
|
-
buildIncrementalOrderTo(sb, querySchema,
|
285
|
+
buildIncrementalOrderTo(sb, querySchema, incrementalColumns);
|
284
286
|
newOrderByExpression = Optional.of(sb.toString());
|
285
287
|
}
|
286
288
|
|
@@ -292,38 +294,43 @@ public class JdbcInputConnection
|
|
292
294
|
}
|
293
295
|
|
294
296
|
public PreparedQuery wrapIncrementalQuery(String rawQuery, JdbcSchema querySchema,
|
295
|
-
List<
|
297
|
+
List<String> incrementalColumns, List<JsonNode> incrementalValues,
|
298
|
+
boolean useRawQuery) throws SQLException
|
296
299
|
{
|
297
300
|
StringBuilder sb = new StringBuilder();
|
298
301
|
List<JdbcLiteral> parameters = ImmutableList.of();
|
299
302
|
|
300
|
-
|
301
|
-
|
302
|
-
|
303
|
+
if (useRawQuery) {
|
304
|
+
parameters = replacePlaceholder(sb, rawQuery, querySchema, incrementalColumns, incrementalValues);
|
305
|
+
} else {
|
306
|
+
sb.append("SELECT * FROM (");
|
307
|
+
sb.append(truncateStatementDelimiter(rawQuery));
|
308
|
+
sb.append(") embulk_incremental_");
|
309
|
+
|
310
|
+
if (incrementalValues != null) {
|
311
|
+
sb.append(" WHERE ");
|
312
|
+
parameters = buildIncrementalConditionTo(sb,
|
313
|
+
querySchema, incrementalColumns, incrementalValues);
|
314
|
+
}
|
303
315
|
|
304
|
-
|
305
|
-
sb
|
306
|
-
parameters = buildIncrementalConditionTo(sb,
|
307
|
-
querySchema, incrementalColumnIndexes, incrementalValues);
|
316
|
+
sb.append(" ORDER BY ");
|
317
|
+
buildIncrementalOrderTo(sb, querySchema, incrementalColumns);
|
308
318
|
}
|
309
319
|
|
310
|
-
sb.append(" ORDER BY ");
|
311
|
-
buildIncrementalOrderTo(sb, querySchema, incrementalColumnIndexes);
|
312
|
-
|
313
320
|
return new PreparedQuery(sb.toString(), parameters);
|
314
321
|
}
|
315
322
|
|
316
323
|
private List<JdbcLiteral> buildIncrementalConditionTo(
|
317
324
|
StringBuilder sb,
|
318
325
|
JdbcSchema querySchema,
|
319
|
-
List<
|
326
|
+
List<String> incrementalColumns, List<JsonNode> incrementalValues) throws SQLException
|
320
327
|
{
|
321
328
|
ImmutableList.Builder<JdbcLiteral> parameters = ImmutableList.builder();
|
322
329
|
|
323
330
|
List<String> leftColumnNames = new ArrayList<>();
|
324
331
|
List<JdbcLiteral> rightLiterals = new ArrayList<>();
|
325
|
-
for (int n = 0; n <
|
326
|
-
int columnIndex =
|
332
|
+
for (int n = 0; n < incrementalColumns.size(); n++) {
|
333
|
+
int columnIndex = findIncrementalColumnIndex(querySchema, incrementalColumns.get(n));
|
327
334
|
JsonNode value = incrementalValues.get(n);
|
328
335
|
leftColumnNames.add(querySchema.getColumnName(columnIndex));
|
329
336
|
rightLiterals.add(new JdbcLiteral(columnIndex, value));
|
@@ -351,17 +358,102 @@ public class JdbcInputConnection
|
|
351
358
|
return parameters.build();
|
352
359
|
}
|
353
360
|
|
361
|
+
private int findIncrementalColumnIndex(JdbcSchema schema, String incrementalColumn)
|
362
|
+
throws SQLException
|
363
|
+
{
|
364
|
+
Optional<Integer> index = schema.findColumn(incrementalColumn);
|
365
|
+
// must be present because already checked in AbstractJdbcInputPlugin.findIncrementalColumnIndexes .
|
366
|
+
return index.get().intValue();
|
367
|
+
}
|
368
|
+
|
369
|
+
private List<JdbcLiteral> replacePlaceholder(StringBuilder sb, String rawQuery, JdbcSchema querySchema,
|
370
|
+
List<String> incrementalColumns, List<JsonNode> incrementalValues)
|
371
|
+
throws SQLException
|
372
|
+
{
|
373
|
+
// Insert pair of columnName:columnIndex order by column name length DESC
|
374
|
+
TreeMap<String, Integer> columnNames = new TreeMap<>(new Comparator<String>() {
|
375
|
+
@Override
|
376
|
+
public int compare(String val1, String val2) {
|
377
|
+
return val2.length() - val1.length();
|
378
|
+
}
|
379
|
+
});
|
380
|
+
|
381
|
+
ImmutableList.Builder<JdbcLiteral> parameters = ImmutableList.builder();
|
382
|
+
for (String columnName : incrementalColumns) {
|
383
|
+
int columnIndex = findIncrementalColumnIndex(querySchema, columnName);
|
384
|
+
columnNames.put(columnName, columnIndex);
|
385
|
+
}
|
386
|
+
|
387
|
+
// Add value of each columns
|
388
|
+
for (Map.Entry<Integer, Integer> columnPosition: generateColumnPositionList(rawQuery, columnNames).entrySet()) {
|
389
|
+
int columnIndex = columnPosition.getValue();
|
390
|
+
JsonNode value = incrementalValues.get(columnIndex);
|
391
|
+
parameters.add(new JdbcLiteral(columnIndex, value));
|
392
|
+
}
|
393
|
+
|
394
|
+
// Replace placeholder ":column1" string with "?"
|
395
|
+
for (Entry<String, Integer> column : columnNames.entrySet()) {
|
396
|
+
String columnName = column.getKey();
|
397
|
+
while (rawQuery.contains(":" + columnName)) {
|
398
|
+
rawQuery = rawQuery.replaceFirst(":" + columnName, "?");
|
399
|
+
}
|
400
|
+
}
|
401
|
+
|
402
|
+
sb.append(rawQuery);
|
403
|
+
|
404
|
+
return parameters.build();
|
405
|
+
}
|
406
|
+
|
407
|
+
/*
|
408
|
+
* This method parse original query that contains placeholder ":column" and store its index position and columnIndex value in Map
|
409
|
+
*
|
410
|
+
* @param query string that contains placeholder like ":column"
|
411
|
+
* @param pair of columnName:columnIndex sorted by column name length desc ["num2", 1]["num", 0]
|
412
|
+
* @return pair of index position where ":column" appears and columnIndex sorted by index position [65,0][105,0][121,1]
|
413
|
+
*
|
414
|
+
* last_record: [1,101]
|
415
|
+
* SELECT * FROM query_load WHERE
|
416
|
+
* num IS NOT NULL
|
417
|
+
* AND num > :num
|
418
|
+
* AND num2 IS NOT NULL
|
419
|
+
* OR (num = :num AND num2 > :num2)
|
420
|
+
* ORDER BY num ASC, num2 ASC
|
421
|
+
* in above case, return value will be [65,0][105,0][121,1]
|
422
|
+
*/
|
423
|
+
private TreeMap<Integer, Integer> generateColumnPositionList(String rawQuery, TreeMap<String, Integer> columnNames)
|
424
|
+
{
|
425
|
+
TreeMap<Integer, Integer> columnPositionList = new TreeMap<>();
|
426
|
+
|
427
|
+
for (Entry<String, Integer> column : columnNames.entrySet()) {
|
428
|
+
int lastIndex = 0;
|
429
|
+
while (true) {
|
430
|
+
int index = rawQuery.indexOf(":" + column.getKey(), lastIndex);
|
431
|
+
if (index == -1) {
|
432
|
+
break;
|
433
|
+
}
|
434
|
+
if (!columnPositionList.containsKey(index)) {
|
435
|
+
columnPositionList.put(index, column.getValue());
|
436
|
+
}
|
437
|
+
lastIndex = index + 2;
|
438
|
+
}
|
439
|
+
}
|
440
|
+
return columnPositionList;
|
441
|
+
}
|
442
|
+
|
354
443
|
private void buildIncrementalOrderTo(StringBuilder sb,
|
355
|
-
JdbcSchema querySchema, List<
|
444
|
+
JdbcSchema querySchema, List<String> incrementalColumns) throws SQLException
|
356
445
|
{
|
357
446
|
boolean first = true;
|
358
|
-
for (
|
447
|
+
for (String incrementalColumn : incrementalColumns) {
|
359
448
|
if (first) {
|
360
449
|
first = false;
|
361
450
|
} else {
|
362
451
|
sb.append(", ");
|
363
452
|
}
|
364
|
-
|
453
|
+
int columnIndex = findIncrementalColumnIndex(querySchema, incrementalColumn);
|
454
|
+
// the following column name is case sensitive,
|
455
|
+
// so should use actual column name got by DatabaseMetaData.
|
456
|
+
sb.append(quoteIdentifierString(querySchema.getColumnName(columnIndex)));
|
365
457
|
}
|
366
458
|
}
|
367
459
|
|
metadata
CHANGED
@@ -1,14 +1,14 @@
|
|
1
1
|
--- !ruby/object:Gem::Specification
|
2
2
|
name: embulk-input-jdbc
|
3
3
|
version: !ruby/object:Gem::Version
|
4
|
-
version: 0.9.
|
4
|
+
version: 0.9.3
|
5
5
|
platform: ruby
|
6
6
|
authors:
|
7
7
|
- Sadayuki Furuhashi
|
8
8
|
autorequire:
|
9
9
|
bindir: bin
|
10
10
|
cert_chain: []
|
11
|
-
date: 2018-
|
11
|
+
date: 2018-08-10 00:00:00.000000000 Z
|
12
12
|
dependencies: []
|
13
13
|
description: Selects records from a table.
|
14
14
|
email:
|
@@ -19,7 +19,7 @@ extra_rdoc_files: []
|
|
19
19
|
files:
|
20
20
|
- README.md
|
21
21
|
- build.gradle
|
22
|
-
- classpath/embulk-input-jdbc-0.9.
|
22
|
+
- classpath/embulk-input-jdbc-0.9.3.jar
|
23
23
|
- lib/embulk/input/jdbc.rb
|
24
24
|
- src/main/java/org/embulk/input/JdbcInputPlugin.java
|
25
25
|
- src/main/java/org/embulk/input/jdbc/AbstractJdbcInputPlugin.java
|
Binary file
|