embulk-input-jdbc 0.9.2 → 0.9.3

This diff represents the content of publicly available package versions that have been released to one of the supported registries. The information contained in this diff is provided for informational purposes only and reflects changes between package versions as they appear in their respective public registries.
checksums.yaml CHANGED
@@ -1,7 +1,7 @@
1
1
  ---
2
2
  SHA1:
3
- metadata.gz: a95b2480f16f4ff5e62ce7ecf7fbfc88f6bded02
4
- data.tar.gz: 38cd9b33ba3820ec82bc961eaafe8642c6807d6f
3
+ metadata.gz: 89f25bb2c92757d6e3f6aad8cdd9721d58eb216c
4
+ data.tar.gz: f1195041a5488157685ba4f435b5e0e67276f107
5
5
  SHA512:
6
- metadata.gz: e5ffa68e4a72bd90e97829223d7f02925779f524c55073846ea0c5fd083b7291b81a6d3eeca9103aeaeb30a0eda6dcdffb5a48f1518ab383ba32e918b5bfdc93
7
- data.tar.gz: e900e943ba5da276aad6066549664fa67eb219b6a78d3a33673fce54c8232841410b18d469e05cdc086f4f17c123a34d395f61831613d98640494df088fa7b03
6
+ metadata.gz: 66aded100ed9af8b837e9b9d12cf0d43cda252a5f909b6d0d0bbea97143fd3165d02882c5e54aaa3c1f49ab2e4fd804fa7f4bb33c54a915aca282a0be63588ef
7
+ data.tar.gz: 95bb8bea452106e630740f6ea239a2556c8bee1c36be9fef6318f2be97efc53eefc8844b342b7aba7ea5e659941b63527528ee6392dba9becbab67c1497eafdb
data/README.md CHANGED
@@ -22,6 +22,7 @@ Generic JDBC input plugin for Embulk loads records from a database using a JDBC
22
22
  - **options**: extra JDBC properties (hash, default: {})
23
23
  - If you write SQL directly,
24
24
  - **query**: SQL to run (string)
25
+ - **use_raw_query_with_incremental**: If true, you can write optimized query using prepared statement. See [Use incremental loading with raw query](#use-incremental-loading-with-raw-query) for more detail (boolean, default: false)
25
26
  - If **query** is not set,
26
27
  - **table**: destination table name (string, required)
27
28
  - **select**: expression of select (e.g. `id, created_at`) (string, default: "*")
@@ -76,6 +77,38 @@ CREATE INDEX embulk_incremental_loading_index ON table (updated_at, id);
76
77
 
77
78
  Recommended usage is to leave `incremental_columns` unset and let this plugin automatically finds an auto-increment primary key. Currently, only strings and integers are supported as incremental_columns.
78
79
 
80
+ ### Use incremental loading with raw query
81
+
82
+ **IMPORTANT**: This is an advanced feature and assume you have an enough knowledge about incremental loading using Embulk and this plugin
83
+
84
+ Normally, you can't write your own query for incremental loading.
85
+ `use_raw_query_with_incremental` option allow you to write raw query for incremental loading. It might be well optimized and faster than SQL statement which is automatically generated by plugin.
86
+
87
+ Prepared statement starts with `:` is available instead of fixed value.
88
+ `last_record` value is necessary when you use this option.
89
+ Please use prepared statement that is well distinguishable in SQL statement. Using too simple prepared statement like `:a` might cause SQL parse failure.
90
+
91
+ In the following example, prepared statement `:foo_id` will be replaced with value "1" which is specified in `last_record`.
92
+
93
+ ```yaml
94
+ in:
95
+ type: jdbc
96
+ query:
97
+ SELECT
98
+ foo.id as foo_id, bar.name
99
+ FROM
100
+ foo LEFT JOIN bar ON foo.id = bar.id
101
+ WHERE
102
+ foo.hoge IS NOT NULL
103
+ AND foo.id > :foo_id
104
+ ORDER BY
105
+ foo.id ASC
106
+ use_raw_query_with_incremental: true
107
+ incremental_columns:
108
+ - foo_id
109
+ incremental: true
110
+ last_record: [1]
111
+ ```
79
112
 
80
113
  ## Example
81
114
 
@@ -6,12 +6,14 @@ import java.net.MalformedURLException;
6
6
  import java.net.URISyntaxException;
7
7
  import java.net.URL;
8
8
  import java.nio.file.Path;
9
+ import java.util.Comparator;
9
10
  import java.util.List;
10
11
  import java.util.Map;
11
12
  import java.util.Properties;
12
13
  import java.nio.file.Paths;
13
14
  import java.sql.ResultSet;
14
15
  import java.sql.SQLException;
16
+ import java.util.TreeMap;
15
17
 
16
18
  import org.slf4j.Logger;
17
19
 
@@ -67,6 +69,10 @@ public abstract class AbstractJdbcInputPlugin
67
69
  @ConfigDefault("null")
68
70
  public Optional<String> getQuery();
69
71
 
72
+ @Config("use_raw_query_with_incremental")
73
+ @ConfigDefault("false")
74
+ public boolean getUseRawQueryWithIncremental();
75
+
70
76
  @Config("select")
71
77
  @ConfigDefault("null")
72
78
  public Optional<String> getSelect();
@@ -210,7 +216,29 @@ public abstract class AbstractJdbcInputPlugin
210
216
  // build SELECT query and gets schema of its result
211
217
  String rawQuery = getRawQuery(task, con);
212
218
 
213
- JdbcSchema querySchema = con.getSchemaOfQuery(rawQuery);
219
+ JdbcSchema querySchema = null;
220
+ if (task.getUseRawQueryWithIncremental()) {
221
+ String temporaryQuery = rawQuery;
222
+
223
+ // Insert pair of columnName:columnIndex order by column name length DESC
224
+ TreeMap<String, Integer> columnNames = new TreeMap<>(new Comparator<String>() {
225
+ @Override
226
+ public int compare(String val1, String val2) {
227
+ return val2.length() - val1.length();
228
+ }
229
+ });
230
+ for (int i = 0; i < task.getIncrementalColumns().size(); i++) {
231
+ columnNames.put(task.getIncrementalColumns().get(i), i);
232
+ }
233
+
234
+ for (Map.Entry<String, Integer> column : columnNames.entrySet()) {
235
+ // Temporary replace place holder like ":id" with "?" to avoid SyntaxException while getting schema.
236
+ temporaryQuery = temporaryQuery.replace(":" + column.getKey(), "?");
237
+ }
238
+ querySchema = con.getSchemaOfQuery(temporaryQuery);
239
+ } else {
240
+ querySchema = con.getSchemaOfQuery(rawQuery);
241
+ }
214
242
  task.setQuerySchema(querySchema);
215
243
  // query schema should not change after incremental query
216
244
 
@@ -251,13 +279,13 @@ public abstract class AbstractJdbcInputPlugin
251
279
  }
252
280
 
253
281
  if (task.getQuery().isPresent()) {
254
- preparedQuery = con.wrapIncrementalQuery(rawQuery, querySchema, incrementalColumnIndexes, lastRecord);
282
+ preparedQuery = con.wrapIncrementalQuery(rawQuery, querySchema, incrementalColumns, lastRecord, task.getUseRawQueryWithIncremental());
255
283
  }
256
284
  else {
257
285
  preparedQuery = con.rebuildIncrementalQuery(
258
286
  task.getTable().get(), task.getSelect(),
259
287
  task.getWhere(),
260
- querySchema, incrementalColumnIndexes, lastRecord);
288
+ querySchema, incrementalColumns, lastRecord);
261
289
  }
262
290
  }
263
291
  else {
@@ -330,8 +358,21 @@ public abstract class AbstractJdbcInputPlugin
330
358
  if (task.getTable().isPresent() || task.getSelect().isPresent() ||
331
359
  task.getWhere().isPresent() || task.getOrderBy().isPresent()) {
332
360
  throw new ConfigException("'table', 'select', 'where' and 'order_by' parameters are unnecessary if 'query' parameter is set.");
333
- } else if (!task.getIncrementalColumns().isEmpty() || task.getLastRecord().isPresent()) {
334
- throw new ConfigException("'incremental_columns' and 'last_record' parameters are not supported if 'query' parameter is set.");
361
+ } else if (task.getUseRawQueryWithIncremental()) {
362
+ String rawQuery = task.getQuery().get();
363
+ for (String columnName : task.getIncrementalColumns()) {
364
+ if (!rawQuery.contains(":" + columnName)) {
365
+ throw new ConfigException(String.format("Column \":%s\" doesn't exist in query string", columnName));
366
+ }
367
+ }
368
+ if (!task.getLastRecord().isPresent()) {
369
+ throw new ConfigException("'last_record' is required when 'use_raw_query_with_incremental' is set to true");
370
+ }
371
+ if (task.getLastRecord().get().size() != task.getIncrementalColumns().size()) {
372
+ throw new ConfigException("size of 'last_record' is different from of 'incremental_columns'");
373
+ }
374
+ } else if (!task.getUseRawQueryWithIncremental() && (!task.getIncrementalColumns().isEmpty() || task.getLastRecord().isPresent())) {
375
+ throw new ConfigException("'incremental_columns' and 'last_record' parameters are not supported if 'query' parameter is set and 'use_raw_query_with_incremental' is set to false.");
335
376
  }
336
377
  return task.getQuery().get();
337
378
  } else if (task.getTable().isPresent()) {
@@ -7,17 +7,19 @@ import java.sql.ResultSet;
7
7
  import java.sql.ResultSetMetaData;
8
8
  import java.sql.SQLException;
9
9
  import java.sql.Statement;
10
+ import java.util.Comparator;
10
11
  import java.util.Locale;
12
+ import java.util.Map;
13
+ import java.util.Map.Entry;
11
14
  import java.util.Set;
12
15
 
13
- import org.embulk.config.ConfigException;
14
16
  import org.embulk.spi.Exec;
15
17
  import org.embulk.input.jdbc.getter.ColumnGetter;
16
18
  import org.slf4j.Logger;
17
19
 
18
20
  import java.util.List;
19
21
  import java.util.ArrayList;
20
- import static java.util.Locale.ENGLISH;
22
+ import java.util.TreeMap;
21
23
 
22
24
  import com.fasterxml.jackson.annotation.JsonCreator;
23
25
  import com.fasterxml.jackson.annotation.JsonProperty;
@@ -252,7 +254,7 @@ public class JdbcInputConnection
252
254
  public PreparedQuery rebuildIncrementalQuery(String tableName,
253
255
  Optional<String> selectExpression, Optional<String> whereCondition,
254
256
  JdbcSchema querySchema,
255
- List<Integer> incrementalColumnIndexes, List<JsonNode> incrementalValues) throws SQLException
257
+ List<String> incrementalColumns, List<JsonNode> incrementalValues) throws SQLException
256
258
  {
257
259
  List<JdbcLiteral> parameters = ImmutableList.of();
258
260
 
@@ -268,7 +270,7 @@ public class JdbcInputConnection
268
270
 
269
271
  sb.append("(");
270
272
  parameters = buildIncrementalConditionTo(sb,
271
- querySchema, incrementalColumnIndexes, incrementalValues);
273
+ querySchema, incrementalColumns, incrementalValues);
272
274
  sb.append(")");
273
275
 
274
276
  newWhereCondition = Optional.of(sb.toString());
@@ -280,7 +282,7 @@ public class JdbcInputConnection
280
282
  Optional<String> newOrderByExpression;
281
283
  {
282
284
  StringBuilder sb = new StringBuilder();
283
- buildIncrementalOrderTo(sb, querySchema, incrementalColumnIndexes);
285
+ buildIncrementalOrderTo(sb, querySchema, incrementalColumns);
284
286
  newOrderByExpression = Optional.of(sb.toString());
285
287
  }
286
288
 
@@ -292,38 +294,43 @@ public class JdbcInputConnection
292
294
  }
293
295
 
294
296
  public PreparedQuery wrapIncrementalQuery(String rawQuery, JdbcSchema querySchema,
295
- List<Integer> incrementalColumnIndexes, List<JsonNode> incrementalValues) throws SQLException
297
+ List<String> incrementalColumns, List<JsonNode> incrementalValues,
298
+ boolean useRawQuery) throws SQLException
296
299
  {
297
300
  StringBuilder sb = new StringBuilder();
298
301
  List<JdbcLiteral> parameters = ImmutableList.of();
299
302
 
300
- sb.append("SELECT * FROM (");
301
- sb.append(truncateStatementDelimiter(rawQuery));
302
- sb.append(") embulk_incremental_");
303
+ if (useRawQuery) {
304
+ parameters = replacePlaceholder(sb, rawQuery, querySchema, incrementalColumns, incrementalValues);
305
+ } else {
306
+ sb.append("SELECT * FROM (");
307
+ sb.append(truncateStatementDelimiter(rawQuery));
308
+ sb.append(") embulk_incremental_");
309
+
310
+ if (incrementalValues != null) {
311
+ sb.append(" WHERE ");
312
+ parameters = buildIncrementalConditionTo(sb,
313
+ querySchema, incrementalColumns, incrementalValues);
314
+ }
303
315
 
304
- if (incrementalValues != null) {
305
- sb.append(" WHERE ");
306
- parameters = buildIncrementalConditionTo(sb,
307
- querySchema, incrementalColumnIndexes, incrementalValues);
316
+ sb.append(" ORDER BY ");
317
+ buildIncrementalOrderTo(sb, querySchema, incrementalColumns);
308
318
  }
309
319
 
310
- sb.append(" ORDER BY ");
311
- buildIncrementalOrderTo(sb, querySchema, incrementalColumnIndexes);
312
-
313
320
  return new PreparedQuery(sb.toString(), parameters);
314
321
  }
315
322
 
316
323
  private List<JdbcLiteral> buildIncrementalConditionTo(
317
324
  StringBuilder sb,
318
325
  JdbcSchema querySchema,
319
- List<Integer> incrementalColumnIndexes, List<JsonNode> incrementalValues) throws SQLException
326
+ List<String> incrementalColumns, List<JsonNode> incrementalValues) throws SQLException
320
327
  {
321
328
  ImmutableList.Builder<JdbcLiteral> parameters = ImmutableList.builder();
322
329
 
323
330
  List<String> leftColumnNames = new ArrayList<>();
324
331
  List<JdbcLiteral> rightLiterals = new ArrayList<>();
325
- for (int n = 0; n < incrementalColumnIndexes.size(); n++) {
326
- int columnIndex = incrementalColumnIndexes.get(n);
332
+ for (int n = 0; n < incrementalColumns.size(); n++) {
333
+ int columnIndex = findIncrementalColumnIndex(querySchema, incrementalColumns.get(n));
327
334
  JsonNode value = incrementalValues.get(n);
328
335
  leftColumnNames.add(querySchema.getColumnName(columnIndex));
329
336
  rightLiterals.add(new JdbcLiteral(columnIndex, value));
@@ -351,17 +358,102 @@ public class JdbcInputConnection
351
358
  return parameters.build();
352
359
  }
353
360
 
361
+ private int findIncrementalColumnIndex(JdbcSchema schema, String incrementalColumn)
362
+ throws SQLException
363
+ {
364
+ Optional<Integer> index = schema.findColumn(incrementalColumn);
365
+ // must be present because already checked in AbstractJdbcInputPlugin.findIncrementalColumnIndexes .
366
+ return index.get().intValue();
367
+ }
368
+
369
+ private List<JdbcLiteral> replacePlaceholder(StringBuilder sb, String rawQuery, JdbcSchema querySchema,
370
+ List<String> incrementalColumns, List<JsonNode> incrementalValues)
371
+ throws SQLException
372
+ {
373
+ // Insert pair of columnName:columnIndex order by column name length DESC
374
+ TreeMap<String, Integer> columnNames = new TreeMap<>(new Comparator<String>() {
375
+ @Override
376
+ public int compare(String val1, String val2) {
377
+ return val2.length() - val1.length();
378
+ }
379
+ });
380
+
381
+ ImmutableList.Builder<JdbcLiteral> parameters = ImmutableList.builder();
382
+ for (String columnName : incrementalColumns) {
383
+ int columnIndex = findIncrementalColumnIndex(querySchema, columnName);
384
+ columnNames.put(columnName, columnIndex);
385
+ }
386
+
387
+ // Add value of each columns
388
+ for (Map.Entry<Integer, Integer> columnPosition: generateColumnPositionList(rawQuery, columnNames).entrySet()) {
389
+ int columnIndex = columnPosition.getValue();
390
+ JsonNode value = incrementalValues.get(columnIndex);
391
+ parameters.add(new JdbcLiteral(columnIndex, value));
392
+ }
393
+
394
+ // Replace placeholder ":column1" string with "?"
395
+ for (Entry<String, Integer> column : columnNames.entrySet()) {
396
+ String columnName = column.getKey();
397
+ while (rawQuery.contains(":" + columnName)) {
398
+ rawQuery = rawQuery.replaceFirst(":" + columnName, "?");
399
+ }
400
+ }
401
+
402
+ sb.append(rawQuery);
403
+
404
+ return parameters.build();
405
+ }
406
+
407
+ /*
408
+ * This method parse original query that contains placeholder ":column" and store its index position and columnIndex value in Map
409
+ *
410
+ * @param query string that contains placeholder like ":column"
411
+ * @param pair of columnName:columnIndex sorted by column name length desc ["num2", 1]["num", 0]
412
+ * @return pair of index position where ":column" appears and columnIndex sorted by index position [65,0][105,0][121,1]
413
+ *
414
+ * last_record: [1,101]
415
+ * SELECT * FROM query_load WHERE
416
+ * num IS NOT NULL
417
+ * AND num > :num
418
+ * AND num2 IS NOT NULL
419
+ * OR (num = :num AND num2 > :num2)
420
+ * ORDER BY num ASC, num2 ASC
421
+ * in above case, return value will be [65,0][105,0][121,1]
422
+ */
423
+ private TreeMap<Integer, Integer> generateColumnPositionList(String rawQuery, TreeMap<String, Integer> columnNames)
424
+ {
425
+ TreeMap<Integer, Integer> columnPositionList = new TreeMap<>();
426
+
427
+ for (Entry<String, Integer> column : columnNames.entrySet()) {
428
+ int lastIndex = 0;
429
+ while (true) {
430
+ int index = rawQuery.indexOf(":" + column.getKey(), lastIndex);
431
+ if (index == -1) {
432
+ break;
433
+ }
434
+ if (!columnPositionList.containsKey(index)) {
435
+ columnPositionList.put(index, column.getValue());
436
+ }
437
+ lastIndex = index + 2;
438
+ }
439
+ }
440
+ return columnPositionList;
441
+ }
442
+
354
443
  private void buildIncrementalOrderTo(StringBuilder sb,
355
- JdbcSchema querySchema, List<Integer> incrementalColumnIndexes)
444
+ JdbcSchema querySchema, List<String> incrementalColumns) throws SQLException
356
445
  {
357
446
  boolean first = true;
358
- for (int i : incrementalColumnIndexes) {
447
+ for (String incrementalColumn : incrementalColumns) {
359
448
  if (first) {
360
449
  first = false;
361
450
  } else {
362
451
  sb.append(", ");
363
452
  }
364
- sb.append(quoteIdentifierString(querySchema.getColumnName(i)));
453
+ int columnIndex = findIncrementalColumnIndex(querySchema, incrementalColumn);
454
+ // the following column name is case sensitive,
455
+ // so should use actual column name got by DatabaseMetaData.
456
+ sb.append(quoteIdentifierString(querySchema.getColumnName(columnIndex)));
365
457
  }
366
458
  }
367
459
 
metadata CHANGED
@@ -1,14 +1,14 @@
1
1
  --- !ruby/object:Gem::Specification
2
2
  name: embulk-input-jdbc
3
3
  version: !ruby/object:Gem::Version
4
- version: 0.9.2
4
+ version: 0.9.3
5
5
  platform: ruby
6
6
  authors:
7
7
  - Sadayuki Furuhashi
8
8
  autorequire:
9
9
  bindir: bin
10
10
  cert_chain: []
11
- date: 2018-07-03 00:00:00.000000000 Z
11
+ date: 2018-08-10 00:00:00.000000000 Z
12
12
  dependencies: []
13
13
  description: Selects records from a table.
14
14
  email:
@@ -19,7 +19,7 @@ extra_rdoc_files: []
19
19
  files:
20
20
  - README.md
21
21
  - build.gradle
22
- - classpath/embulk-input-jdbc-0.9.2.jar
22
+ - classpath/embulk-input-jdbc-0.9.3.jar
23
23
  - lib/embulk/input/jdbc.rb
24
24
  - src/main/java/org/embulk/input/JdbcInputPlugin.java
25
25
  - src/main/java/org/embulk/input/jdbc/AbstractJdbcInputPlugin.java