集成与扩展 ========== 为了方便集成与扩展, ChannelQuery 提供可定制化的能力。 内嵌到应用 ----------- 引入 ChannelQuery 脚本引擎 ``Jar`` 包。 示例 ^^^^^ .. code-block:: scala val script = "..." val paramsContext = mutable.Map[String, AnyRef]() .... val context = mutable.Map[String, AnyRef]() // 你的 SparkSession 对象 val rt = SparkSQL.realtime() val ds = ChannelEngine.execute(script, context, rt, paramsContext).asInstanceOf[Dataset[Row]] val r = ds.limit(200).toJSON.collect() // 停止会话,销毁所有临时视图 rt.stop() 自定义JDBC数据源 ---------------------- 默认读取 ``fs/data.sources.json`` 配置文件,根据数据源名称查找,然后创建数据源给 ``load``、 ``save`` 语法使用。 .. code-block:: sql -- demo 是已配置的数据源名称 load jdbc 'select * from orders' option ds='demo'; 自定义JDBC数据源需实现 ``com.torchdb.framework.lang.IDataSource`` 接口,以下为内置数据源实现(Scala版,其它JVM语言参考自身实现接口方式): .. code-block:: scala class DefaultDataSource extends IDataSource { override def source(name: String): DataSource = { var dsName = name if(name.isEmpty){ dsName = "default" } val path = FilenameUtils.concat(System.getProperty("user.dir"), "fs/data.sources.json") val file = new File(path) if (file.exists()) { val mapper = new ObjectMapper() val nodes = mapper.readTree(file) if (nodes.isArray) { val it = nodes.iterator() while (it.hasNext) { val node = it.next() if (node.get("name").asText().equalsIgnoreCase(dsName)) { return DataSource(node.get("url").asText(), node.get("user").asText(), node.get("password").asText(), node.get("driver").asText(), node.get("queryTimeout").asInt(), node.get("fetchSize").asInt()) } } } } DataSource("", "", "", "", 60, 10000) } } 然后在脚本执行前注入: .. code-block:: scala ChannelEngine.dataSourceImplClassName="com.torchdb.framework.lang.DefaultDataSource" 自定义管道函数(UDCF) ---------------------- 当前数据驱动是建构在Spark之上,所以管道函数均以 Dataset[Row] 作为参数,返回处理后的 Dataset[Row]。 自定义管道函数需实现 ``com.torchdb.framework.lang.IFunction`` 接口,以下为内置row函数实现(Scala版,其它JVM语言参考自身实现接口方式): .. code-block:: scala class RowAlias extends IFunction { override def call(value: AnyRef, args: List[String], context: Map[String, AnyRef]): AnyRef = { value match { case ds: Dataset[org.apache.spark.sql.Row] => val rows = new java.util.ArrayList[org.apache.spark.sql.Row]() val results = ds.collect() args.foreach(arg => { rows.add(results(Math.abs(arg.toInt) - 1)) }) if (rows.isEmpty) { rows.add(results(0)) } val s = context("sparkSession").asInstanceOf[SparkSession] return s.createDataFrame(rows, ds.schema) case _ => } value } } 最后,在执行脚本前注册函数 .. code-block:: scala ChannelEngine.registry(funcName, funcImplClassName)