sparksql插入postgresql 字段格式不匹配报错处理

2021/7/12 19:06:40

本文主要是介绍sparksql插入postgresql 字段格式不匹配报错处理,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

1、错误关键信息

Caused by: org.postgresql.util.PSQLException: ERROR: column "c1" is of type point but expression is of type character
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2553)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2285)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:323)
	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:859) 

详细异常样例:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, info3, executor 1): java.sql.BatchUpdateException: Batch entry 0 INSERT INTO sink_pg_newtable_10 ("column1","column2","column3","column4","column5","column6","column7","column8","column9","column10","column11","column13","column14","column15","column16","column17","column18","column19","column20","column21","column22","column23","column24","column25","column27","column28","column29","column30","column31","column32","column33","column34","column35","column36","column37","column38","column39","column40","column41","column42","column44","column45","column46","column47","column48","column49","column50","column51","column52","column53","column54","column55","column56","column57","column58","column59","column60") VALUES (NULL,'2','true','false',NULL,'(1,2),(1,2)',NULL,'1','192','192.168.0.0/32','<(1,1),5>',NULL,'[2020-09-10,2020-10-09)',NULL,NULL,NULL,'192.168.1.32','11','1','11','[2,4)','11','[124,456)','78','11 days 11:11:11','{"a":1,"b":2}','{"a": 1, "b": 2}','$."action"','{1,-1,0}','[(1,1),(2,2)]','08:00:2b:01:02:03','08:00:2b:ff:fe:01:02:03','11.0',NULL,NULL,NULL,NULL,NULL,NULL,NULL,'3','3','3',NULL,'3',NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL) was aborted: ERROR: column "column1" is of type bigint but expression is of type character
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:169)
	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:862)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:901)
	at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1644)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:676)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.postgresql.util.PSQLException: ERROR: column "column1" is of type bigint but expression is of type character
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2553)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2285)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:323)
	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:859)
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:980)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:978)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:978)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:838)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:68)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	......
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO sink_pg_newtable_10 ("column1","column2","column3","column4","column5","column6","column7","column8","column9","column10","column11","column13","column14","column15","column16","column17","column18","column19","column20","column21","column22","column23","column24","column25","column27","column28","column29","column30","column31","column32","column33","column34","column35","column36","column37","column38","column39","column40","column41","column42","column44","column45","column46","column47","column48","column49","column50","column51","column52","column53","column54","column55","column56","column57","column58","column59","column60") VALUES (NULL,'2','true','false',NULL,'(1,2),(1,2)',NULL,'1','192','192.168.0.0/32','<(1,1),5>',NULL,'[2020-09-10,2020-10-09)',NULL,NULL,NULL,'192.168.1.32','11','1','11','[2,4)','11','[124,456)','78','11 days 11:11:11','{"a":1,"b":2}','{"a": 1, "b": 2}','$."action"','{1,-1,0}','[(1,1),(2,2)]','08:00:2b:01:02:03','08:00:2b:ff:fe:01:02:03','11.0',NULL,NULL,NULL,NULL,NULL,NULL,NULL,'3','3','3',NULL,'3',NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL) was aborted: ERROR: column "column1" is of type bigint but expression is of type character
	at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:169)
	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:862)
	at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:901)
	at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1644)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:676)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable$1.apply(JdbcUtils.scala:838)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:980)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.postgresql.util.PSQLException: ERROR: column "column1" is of type bigint but expression is of type character
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2553)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2285)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:323)
	at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:859)

2、解决方案

     这里要分为两种情况去考虑:

  1)第一种情况,插入字段值不为 NULL

        这种情况主要是数据类型不匹配,pg在数据插入时不能正确的类型转换,举例说明:如 字段 c1 类型为point,值为 (1,2),则插入语句像 "insert into table (c1) values('(1,2)'); ",这里的c1字段值默认被按照varchar类型处理了,pg不能正确的将它转换为 point类型。那么解决方案就很简单,配置数据库连接url参数即可:"stringtype=unspecified",申明pg自动处理字符串转换逻辑。

   

url=jdbc:postgresql://192.168.1.84:5432/postgres?stringtype=unspecified

 2)第二种情况,插入字段值为NULL

        这种情况问题主要出在sparksql代码里,我们来分析一下。sparksql在进行jdbc操作时需要对数据类型转换,spark里面支持的类型和数据库支持的类型需要映射,这部分实现主要在不同的数据库方言里面,如pg数据库的方言实现类:org.apache.spark.sql.jdbc.PostgresDialect,sparksql在使用的时候通过JdbcDialects.registerDialect(PostgresDialect)注册,然后使用。

        那么我们来举一个例子分析一下问题解决方案,还是c1字段,值为NULL,类型为point。那么在执行sql "insert into table (c1) values(NULL); ",按理说这句sql在pg里面直接执行是不报错的如果字段没有非空约束的话。为什么报错了呢?

      debug 跟代码到sparksql中类org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.savePartition()方法中第661行,这里有通过PrepareStatement设置参数的方式设置字段值,当值为NULL时,这里调用的是 stmt.setNull(i + 1, nullTypes(i)) 方法专门来处理NULL值。

while (i < numFields) {
            if (row.isNullAt(i)) {
              stmt.setNull(i + 1, nullTypes(i)) //注意此处专门处理NULL值
            } else {
              setters(i).apply(stmt, row, i)
            }
            i = i + 1
          }

那么问题来了,setNull()方法有两个参数,第一个是字段下标,第二个是空值时的映射类型,就是当插入字段值为空时,告诉数据库这个数值按照那个类型去处理。那么这样就明了,我们指定正确的类型就行了。但是问题又来了,这字段在数据库到底是什么类型呢?这个我们提前是一定不知道,因为spark并不支持所有的数据库字段类型,比如pg中的point类型,映射到spark中是StringType类型,但是反过来从StringType就不能推测出数据库中类型了,因为可能有很多类型都转成StringType类型了。比如:path,jsonb等pg支持的而通用sql又不支持的类型。

        再来看nullTypes(i)这个代码,这里nullTypes这个是一个数组,存储的是每个字段空值的时候对应的映射的类型。他在哪里赋值的呢,看代码

val nullTypes = rddSchema.fields.map(f => getJdbcType(f.dataType, dialect).jdbcNullType) //这里从getJdbcType方法得到的datatype获取空值映射类型

从上面代码我们看到他是在方言实现类的 getJdbcType 方法获得的,我们再看pg的方言中该方法实现

  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
    case StringType => Some(JdbcType("TEXT", Types.CHAR)) //注意这里,c1对应的是这个,看出空值时c1的NULL值会按照CHAR类型插入,自然会报错
    case BinaryType => Some(JdbcType("BYTEA", Types.BINARY))
    case BooleanType => Some(JdbcType("BOOLEAN", Types.BOOLEAN))
    case FloatType => Some(JdbcType("FLOAT4", Types.FLOAT))
    case DoubleType => Some(JdbcType("FLOAT8", Types.DOUBLE))
    case ShortType => Some(JdbcType("SMALLINT", Types.SMALLINT))
    case t: DecimalType => Some(
      JdbcType(s"NUMERIC(${t.precision},${t.scale})", java.sql.Types.NUMERIC))
    case ArrayType(et, _) if et.isInstanceOf[AtomicType] =>
      getJDBCType(et).map(_.databaseTypeDefinition)
        .orElse(JdbcUtils.getCommonJDBCType(et).map(_.databaseTypeDefinition))
        .map(typeName => JdbcType(s"$typeName[]", java.sql.Types.ARRAY))
    case ByteType => throw new IllegalArgumentException(s"Unsupported type in postgresql: $dt");
    case _ => None
  }

从代码 case StringType => Some(JdbcType("TEXT", Types.CHAR)) 中我们看到c1对应的StringType类型,空值时c1的NULL值会按照CHAR类型插入,不是数据库中真实类型point类型,自然会报错。

        那么怎么处理?其实我们可以指定一个空类型让pg自己决定最终使用哪种类型插入,避免定死了类型后强转不了儿报错。那么就重写PostgresDialect方言,重写 getJDBCType 方法,修改如下(java版本):

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Types;

import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions;
import org.apache.spark.sql.jdbc.JdbcDialect;
import org.apache.spark.sql.jdbc.JdbcType;
import org.apache.spark.sql.types.ArrayType$;
import org.apache.spark.sql.types.BinaryType$;
import org.apache.spark.sql.types.BooleanType$;
import org.apache.spark.sql.types.ByteType$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DateType$;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.DecimalType$;
import org.apache.spark.sql.types.DoubleType$;
import org.apache.spark.sql.types.FloatType$;
import org.apache.spark.sql.types.IntegerType$;
import org.apache.spark.sql.types.LongType$;
import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.ShortType$;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.TimestampType$;

import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Some;
import scala.collection.JavaConverters;
import scala.collection.immutable.Map;

public class PostgresDialect extends JdbcDialect {
	private static final long serialVersionUID = -5826284056572945657L;

	@Override
	public boolean canHandle(String url) {
		return url.startsWith("jdbc:postgresql:");
	}
	
	@SuppressWarnings({ "rawtypes", "unchecked" })
	@Override
	public Option getCatalystType(int sqlType, String typeName, int size, MetadataBuilder md) {
		if (sqlType == Types.REAL) {
			return Option$.MODULE$.apply(FloatType$.MODULE$);
		} else if (sqlType == Types.SMALLINT) {
			return Option$.MODULE$.apply(ShortType$.MODULE$);
		} else if (sqlType == Types.BIT && typeName.equals("bit") && size != 1) {
			return Option$.MODULE$.apply(BinaryType$.MODULE$);
		} else if (sqlType == Types.OTHER) {
			return Option$.MODULE$.apply(StringType$.MODULE$);
		} else if (sqlType == Types.ARRAY) {
			int scale = (int) md.build().getLong("scale");
			return Option$.MODULE$.apply(ArrayType$.MODULE$.apply(toCatalystType(typeName, size, scale).get()));
		} else {
			return None$.MODULE$;
		}
	}

	private Option<DataType> toCatalystType(String typeName, int precision, int scale) {
		switch (typeName) {
		case "bool":
			return Option$.MODULE$.apply(BinaryType$.MODULE$);
		case "int2":
			return Option$.MODULE$.apply(ShortType$.MODULE$);
		case "int4":
			return Option$.MODULE$.apply(IntegerType$.MODULE$);
		case "int8":
		case "oid":
			return Option$.MODULE$.apply(LongType$.MODULE$);
		case "float4":
			return Option$.MODULE$.apply(FloatType$.MODULE$);
		case "money":
		case "float8":
			return Option$.MODULE$.apply(DoubleType$.MODULE$);
		case "text":
		case "varchar":
		case "char":
		case "cidr":
		case "inet":
		case "json":
		case "jsonb":
		case "uuid":
			return Option$.MODULE$.apply(StringType$.MODULE$);
		case "bytea":
			return Option$.MODULE$.apply(BinaryType$.MODULE$);
		case "timestamp":
		case "timestamptz":
		case "time":
		case "timetz":
			return Option$.MODULE$.apply(TimestampType$.MODULE$);
		case "date":
			return Option$.MODULE$.apply(DateType$.MODULE$);
		case "numeric":
		case "decimal":
			if (precision > 0) {
				return Option$.MODULE$.apply(new DecimalType(Math.min(precision, DecimalType$.MODULE$.MAX_PRECISION()),
						Math.min(scale, DecimalType$.MODULE$.MAX_SCALE())));
			} else {
				return Option$.MODULE$.apply(new DecimalType(DecimalType$.MODULE$.MAX_PRECISION(), 18));
			}
		default:
			return null;
		}

	}

	@SuppressWarnings({ "unchecked", "rawtypes" })
	@Override
	public Option<JdbcType> getJDBCType(DataType dt) {
		Object obj;
		DataType datatype = dt;
		if (StringType$.MODULE$.equals(datatype)) {
			obj = new Some(new JdbcType("TEXT", Types.NULL)); // 改成Types.NULL
		} else if (BinaryType$.MODULE$.equals(datatype)) {
			obj = new Some(new JdbcType("BYTEA", Types.BINARY));
		} else if (BooleanType$.MODULE$.equals(datatype)) {
			obj = new Some(new JdbcType("BOOLEAN", Types.BOOLEAN));
		} else if (FloatType$.MODULE$.equals(datatype)) {
			obj = new Some(new JdbcType("FLOAT4", Types.FLOAT));
		} else if (DoubleType$.MODULE$.equals(datatype)) {
			obj = new Some(new JdbcType("FLOAT8", Types.DOUBLE));
		} else if (ShortType$.MODULE$.equals(datatype)) {
			obj = new Some(new JdbcType("SMALLINT", Types.SMALLINT));
		} else if (DecimalType$.MODULE$.SYSTEM_DEFAULT().equals(datatype)) {
			obj = new Some(new JdbcType("NUMBER(38,18)", Types.NUMERIC));
		} else if (ByteType$.MODULE$.equals(datatype)) {
			throw new IllegalArgumentException("Unsupported type in postgresql:" + dt);
		} else {
			obj = None$.MODULE$;
		}
		return ((Option) (obj));
	}

	@Override
	public String getTableExistsQuery(String table) {
		return "SELECT 1 FROM " + table + " LIMIT 1";
	}

	@Override
	public String getTruncateQuery(String table, Option<Object> cascade) {
		Object object = cascade.get();
		if (object != null && Boolean.valueOf(object.toString())) {
			return "TRUNCATE TABLE ONLY " + table + " CASCADE";
		}
		return "TRUNCATE TABLE ONLY" + table;

	}

	@Override
	public void beforeFetch(Connection connection, Map<String, String> properties) {
		super.beforeFetch(connection, properties);
		java.util.Map<String, String> javaMap = JavaConverters.mapAsJavaMapConverter(properties).asJava();
		String stringOption = javaMap.get(JDBCOptions.JDBC_BATCH_FETCH_SIZE());
		if (!stringOption.isEmpty() && Integer.valueOf(stringOption) > 0) {
			try {
				connection.setAutoCommit(false);
			} catch (SQLException e) {
				throw new RuntimeException(e);
			}
		}
	}

}

主要修改这里

if (StringType$.MODULE$.equals(datatype)) {
            obj = new Some(new JdbcType("TEXT", Types.NULL)); // 改成Types.NULL
        }

然后卸载原来的方言,只用自定义的方言即可。

JdbcDialects.unregisterDialect(org.apache.spark.sql.jdbc.PostgresDialect$.MODULE$); //卸载原来的方言实现
JdbcDialects.registerDialect(new PostgresDialect()); //注册自定义的方言实现

//数据操作逻辑
 ........

     声明:文中有定义命名不一致的地方请不要计较这些细节,只是举例说明无需严格对应关系。



这篇关于sparksql插入postgresql 字段格式不匹配报错处理的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程