「開源」數據同步ETL工具,支持多數據源間的增、刪、改數據同步 etl工具informatica


ETL的工具應用
ETL工具的典型代表有:Informatica、Datastage、OWB、微軟DTS、Beeload、Kettle、久其ETL……
開源的工具有eclipse的etl插件:cloveretl
數據集成:快速實現ETL
ETL的質量問題具體表現為正確性、完整性、一致性、完備性、有效性、時效性和可獲取性等幾個特性 。而影響質量問題的原因有很多,由系統集成和歷史數據造成的原因主要包括:業務系統不同時期系統之間數據模型不一致;業務系統不同時期業務過程有變化;舊系統模塊在運營、人事、財務、辦公系統等相關信息的不一致;遺留系統和新業務、管理系統數據集成不完備帶來的不一致性 。
實現ETL,首先要實現ETL轉換的過程 。體現為以下幾個方面:
1、空值處理:可捕獲字段空值,進行加載或替換為其他含義數據,并可根據字段空值實現分流加載到不同目標庫 。
【「開源」數據同步ETL工具,支持多數據源間的增、刪、改數據同步 etl工具informatica】2、規范化數據格式:可實現字段格式約束定義,對于數據源中時間、數值、字符等數據,可自定義加載格式 。
3、拆分數據:依據業務需求對字段可進行分解 。例,主叫號 861082585313-8148,可進行區域碼和電話號碼分解 。
4、驗證數據正確性:可利用Lookup及拆分功能進行數據驗證 。例如,主叫號861082585313-8148,進行區域碼和電話號碼分解后,可利用Lookup返回主叫網關或交換機記載的主叫地區,進行數據驗證 。
5、數據替換:對于因業務因素,可實現無效數據、缺失數據的替換 。
6、Lookup:查獲丟失數據 Lookup實現子查詢,并返回用其他手段獲取的缺失字段,保證字段完整性 。
7、建立ETL過程的主外鍵約束:對無依賴性的非法數據,可替換或導出到錯誤數據文件中,保證主鍵唯一記錄的加載 。

數據采集 ETL 工具 Elasticsearch-datatran v6.3.9 發布
數據采集ETL工具 Elasticsearch-datatran v6.3.9 發布 。
Elasticsearch-datatran由bboss開源的數據采集同步ETL工具,提供數據采集、數據清洗轉換處理和數據入庫功能 。支持在Elasticsearch、關系數據庫(mysql,oracle,db2,sqlserver、達夢等)、Mongodb、HBase、Hive、Kafka、文本文件、SFTP/FTP多種數據源之間進行海量數據采集同步;支持本地/ftp日志文件實時增量采集到kafka/elasticsearch/database;支持根據字段進行數據記錄切割;支持根據文件路徑信息將不同文件數據寫入不同的數據庫表 。
提供自定義處理采集數據功能,可以按照自己的要求將采集的數據處理到目的地,支持數據來源包括:database,elasticsearch,kafka,mongodb,hbase,file,ftp等,想把采集的數據保存到什么地方,由自己實現CustomOutPut接口處理即可 。
Elasticsearch版本兼容性:支持 各種Elasticsearch版本(1.x,2.x,5.x,6.x,7.x,+)之間相互數據遷移
v6.3.9 功能改進
導入微服務容器組件包:由bboss-rt調整為bboss-bootstrap-rt
gradle坐標
group: 'com.bbossgroups',name: 'bboss-bootstrap-rt',version: "5.8.5",transitive:true
maven坐標
bboss數據采集ETL案例大全
https://esdoc.bbossgroups.com/#/bboss-datasyn-demo

ETL工具有哪些?
幾種 ETL 工具的比較(DataPipeline,Kettle,Talend,Informatica等)
四種工具的比較主要從以下幾方面進行比對:
1、成本:
軟件成本包括多方面,主要包括軟件產品, 售前培訓, 售后咨詢, 技術支持等 。
開源產品本身是免費的,成本主要是培訓和咨詢,所以成本會一直維持在一個較低水平 。
商業產品本身價格很高,但是一般會提供幾次免費的咨詢或支持,所以采用商用軟件最初成本很高,但是逐漸下降 。
手工編碼最初成本不高,主要是人力成本,但后期維護的工作量會越來越大 。
2、易用性:
DataPipeline: 有非常容易使用的 GUI,具有豐富的可視化監控;
Kettle: GUI+Coding;
Informatica: GUI+Coding,有GUI,但是要專門的訓練;
Talend:GUI+Coding,有 GUI 圖形界面但是以 Eclipse 的插件方式提供;
3、技能要求:
DataPipeline:操作簡單,無技術要求;
Kettle: ETL設計, SQL, 數據建模 ;
Informatica: ETL設計, SQL, 數據建模;
Talend:需要寫Java;
4、底層架構:
DataPipeline:分布式,可水平擴展;
Kettle:主從結構非高可用;
Informatica:分布式;
Talend:分布式;
5、數據實時性:
DataPipeline:支持異構數據源的實時同步,速度非常快;
Kettle:不支持實時數據同步;
Informatica:支持實時,效率較低;
Talend:支持實時處理,需要購買高級版本,價格貴;
6、技術支持:
DataPipeline:本地化原廠技術支持;
Kettle:無;
Informatica:主要在美國;
Talend:主要在美國;
7、自動斷點續傳:
DataPipeline:支持;
Kettle:不支持;
Informatica:不支持;
Talend:不支持;
「開源」數據同步ETL工具,支持多數據源間的增、刪、改數據同步
bboss數據同步可以方便地實現多種數據源之間的數據同步功能,支持增、刪、改數據同步,本文為大家程序各種數據同步案例 。
使用Apache-2.0開源協議
通過bboss,可以非常方便地采集database/mongodb/Elasticsearch/kafka/hbase/本地或者Ftp日志文件源數據,經過數據轉換處理后,再推送到目標庫elasticsearch/database/file/ftp/kafka/dummy/logger 。
數據導入的方式
支持各種主流數據庫、各種es版本以及本地/Ftp日志文件數據采集和同步、加工處理
支持從kafka接收數據;經過加工處理的數據亦可以發送到kafka;
支持將單條記錄切割為多條記錄;
可以將加工后的數據寫入File并上傳到ftp/sftp服務器;
支持備份采集完畢日志文件功能,可以指定備份文件保存時長,定期清理超過時長文件;
支持自動清理下載完畢后ftp服務器上的文件;
支持excel、csv文件采集(本地和ftp/sftp)
支持導出數據到excel和csv文件,并支持上傳到ftp/sftp服務器
提供自定義處理采集數據功能,可以自行將采集的數據按照自己的要求進行處理到目的地,支持數據來源包括:database,elasticsearch,kafka,mongodb,hbase,file,ftp等,想把采集的數據保存到什么地方,有自己實現CustomOutPut接口處理即可 。
支持的數據庫: mysql,maridb,postgress,oracle ,sqlserver,db2,tidb,hive,mongodb、HBase等
支持的Elasticsearch版本: 1.x,2.x,5.x,6.x,7.x,8.x,+
支持海量PB級數據同步導入功能
支持將ip轉換為對應的運營商和城市地理坐標位置信息
支持設置數據bulk導入任務結果處理回調函數,對每次bulk任務的結果進行成功和失敗反饋,然后針對失敗的bulk任務通過error和exception方法進行相應處理
支持以下三種作業調度機制:
bboss另一個顯著的特色就是直接基于java語言來編寫數據同步作業程序,基于強大的java語言和第三方工具包,能夠非常方便地加工和處理需要同步的源數據,然后將最終的數據保存到目標庫(Elasticsearch或者數據庫);同時也可以非常方便地在idea或者eclipse中調試和運行同步作業程序,調試無誤后,通過bboss提供的gradle腳本,即可構建和發布出可部署到生產環境的同步作業包 。因此,對廣大的java程序員來說,bboss無疑是一個輕易快速上手的數據同步利器 。
如果需要增量導入,還需要導入sqlite驅動:
如果需要使用xxjob來調度作業任務,還需要導入坐標:
本文從mysql數據庫表td_cms_document導入數據到es中,除了導入上述maven坐標,還需要額外導入mysql驅動坐標(其他數據庫驅動程序自行導入): mysql 5.x驅動依賴包
mysql 8.x驅動依賴包(mysql 8必須采用相應版本的驅動,否則不能正確運行)
私信回復:數據同步ETL工具
或訪問一飛開源:https://code.exmay.com/
調度工具(ETL+任務流)
kettle是一個ETL工具,ETL(Extract-Transform-Load的縮寫,即數據抽取、轉換、裝載的過程) 。
kettle中文名稱叫水壺,該項目的主程序員MATT 希望把各種數據放到一個壺里,然后以一種指定的格式流出 。
所以他的重心是用于數據
oozie是一個工作流,Oozie工作流是放置在控制依賴DAG(有向無環圖 Direct Acyclic Graph)中的一組動作(例如,Hadoop的Map/Reduce作業、Pig作業等),其中指定了動作執行的順序 。
oozie工作流中是有數據流動的,但是重心是在于工作流的定義 。
二者雖然都有相關功能及數據的流動,但是其實用途是不一樣的 。
查看幫助
列舉出所有linux上的數據庫
列舉出所有Window上的數據庫
查看數據庫下的所有表
(1)確定mysql服務啟動正常
查詢控制端口和查詢進程來確定,一下兩種辦法可以確認mysql是否在啟動狀態
辦法1:查詢端口
MySQL監控的TCP的3306端口,如果顯示3306,證明MySQL服務在運行中
辦法二:查詢進程
可以看見mysql的進程
沒有指定數據導入到哪個目錄,默認是/user/root/表名
原因:
如果表中有主鍵,m的值可以設置大于1的值;如果沒有主鍵只能將m值設置成為1;或者要將m值大于1,需要使用--split-by指定一個字段
設置了-m 1 說明只有一個maptask執行數據導入,默認是4個maptask執行導入操作,但是必須指定一個列來作為劃分依據
導入數據到指定目錄
在導入表數據到HDFS使用Sqoop導入工具,我們可以指定目標目錄 。使用參數 --target-dir來指定導出目的地,使用參數—delete-target-dir來判斷導出目錄是否存在,如果存在就刪掉
查詢導入
提示:must contain '$CONDITIONS' in WHERE clause 。
where id <=1 匹配條件
$CONDITIONS:傳遞作用 。
如果 query 后使用的是雙引號,則 $CONDITIONS前必須加轉義符,防止 shell 識別為自己的變量 。
--query時不能使用--table一起使用
需要指定--target-dir路徑
導入到hdfs指定目錄并指定要求
數據導出儲存方式(數據存儲文件格式---( textfil parquet)--as-textfileImports data as plain text (default)--as-parquetfile Imports data to Parquet Files)
導入表數據子集到HDFS
sqoop導入blob數據到hive
對于CLOB,如xml文本,sqoop可以遷移到Hive表,對應字段存儲為字符類型 。
對于BLOB,如jpg圖片,sqoop無法直接遷移到Hive表,只能先遷移到HDFS路徑,然后再使用Hive命令加載到Hive表 。遷移到HDFS后BLOB字段存儲為16進制形式 。
2.1.3導入關系表到Hive
第一步:導入需要的jar包
將我們mysql表當中的數據直接導入到hive表中的話,我們需要將hive的一個叫做hive-exec-1.1.0-cdh5.14.0.jar的jar包拷貝到sqoop的lib目錄下
第二步:開始導入
導入關系表到hive并自動創建hive表
們也可以通過命令來將我們的mysql的表直接導入到hive表當中去
通過這個命令,我們可以直接將我們mysql表當中的數據以及表結構一起倒入到hive當中去
--incremental增量模式 。
append id 是獲取一個某一列的某個值 。
lastmodified “2016-12-15 15:47:35” 獲取某個時間后修改的所有數據
-append 附加模式
-merge-key id 合并模式
--check-column用來指定一些列,可以去指定多個列;通常的是指定主鍵id
--last -value從哪個值開始增量
==注意:增量導入的時候,一定不能加參數--delete-target-dir 否則會報錯==
第一種增量導入方式(不常用)
1.Append方式
使用場景:有個訂單表,里面每個訂單有一個唯一標識的自增列id,在關系型數據庫中以主鍵的形式存在 。之前已經將id在0-1000之間的編號的訂單導入到HDFS 中;如果在產生新的訂單,此時我們只需指定incremental參數為append,--last-value參數為1000即可,表示只從id大于1000后開始導入 。
(1)創建一個MySQL表
(2)創建一個hive表(表結構與mysql一致)
注意:
append 模式不支持寫入到hive表中
2.lastModify方式
此方式要求原有表有time字段,它能指定一個時間戳,讓sqoop把該時間戳之后的數據導入到HDFS;因為后續訂單可能狀體會變化,變化后time字段時間戳也會變化,此時sqoop依然會將相同狀態更改后的訂單導入HDFS,當然我們可以只當merge-key參數為order-id,表示將后續新的記錄和原有記錄合并 。
# 將時間列大于等于閾值的數據增量導入HDFS
使用 lastmodified 方式導入數據,要指定增量數據是要 --append(追加)還是要 --merge-key(合并)last-value 指定的值是會包含于增量導入的數據中 。
第二種增量導入方式(推薦)
==通過where條件選取數據更加精準==
2.1.5從RDBMS到HBase
會報錯
原因:sqoop1.4.6 只支持 HBase1.0.1 之前的版本的自動創建 HBase 表的功能 。
解決方案:手動創建 HBase 表
導出前,目標表必須存在與目標數據庫中
默認操作是將文件中的數據使用insert語句插入到表中
數據是在HDFS當中的如下目錄/sqoop/emp,數據內容如下
第一步:創建MySQL表
第二步:執行導出命令
通過export來實現數據的導出,將hdfs的數據導出到mysql當中去
全量導出
增量導出
更新導出
總結:
參數介紹
--update-key 后面也可以接多個關鍵字列名,可以使用逗號隔開,Sqoop將會匹配多個關鍵字后再執行更新操作 。
--export-dir 參數配合--table或者--call參數使用,指定了HDFS上需要將數據導入到MySQL中的文件集目錄 。
--update-mode updateonly和allowinsert 。默認模式為updateonly,如果指定--update-mode模式為allowinsert,可以將目標數據庫中原來不存在的數據也導入到數據庫表中 。即將存在的數據更新,不存在數據插入 。
組合測試及說明
1、當指定update-key,且關系型數據庫表存在主鍵時:
A、allowinsert模式時,為更新目標數據庫表存的內容,并且原來不存在的數據也導入到數據庫表;
B、updateonly模式時,為更新目標數據庫表存的內容,并且原來不存在的數據也不導入到數據庫表;
2、當指定update-key,且關系型數據庫表不存在主鍵時:
A、allowinsert模式時,為全部數據追加導入到數據庫表;
B、updateonly模式時,為更新目標數據庫表存的內容,并且原來不存在的數據也不導入到數據庫表;
3、當不指定update-key,且關系型數據庫表存在主鍵時:
A、allowinsert模式時,報主鍵沖突,數據無變化;
B、updateonly模式時,報主鍵沖突,數據無變化;
4、當不指定update-key,且關系型數據庫表不存在主鍵時:
A、allowinsert模式時,為全部數據追加導入到數據庫表;
B、updateonly模式時,為全部數據追加導入到數據庫表;
實際案例:
(1)mysql批量導入hive
使用shell腳本:
筆者目前用sqoop把mysql數據導入到Hive中,最后實現命令行導入,sqoop版本1.4.7,實現如下
最后需要把這個導入搞成job,每天定時去跑,實現數據的自動化增量導入,sqoop支持job的管理,可以把導入創建成job重復去跑,并且它會在metastore中記錄增值,每次執行增量導入之前去查詢
創建job命令如下
創建完job就可以去執行它了
sqoop job --exec users
可以把該指令設為Linux定時任務,或者用Azkaban定時去執行它
hive導出到MySQL時,date類型數據發生變化?
問題原因:時區設置問題,date -R查看服務器時間,show VARIABLES LIKE "%time_zone"查看Mysql時間,system并不表示中國的標準時間,要將時間設置為東八區
(1):對市面上最流行的兩種調度器,給出以下詳細對比,以供技術選型參考 。總體來說,ooize相比azkaban是一個重量級的任務調度系統,功能全面,但配置使用也更復雜 。如果可以不在意某些功能的缺失,輕量級調度器azkaban是很不錯的候選對象 。
(2):功能:
兩者均可以調度mapreduce,pig,java,腳本工作流任務;
兩者均可以定時執行工作流任務;
(3):工作流定義:
Azkaban使用Properties文件定義工作流;
Oozie使用XML文件定義工作流;
(4):工作流傳參:
Azkaban支持直接傳參,例如${input};
Oozie支持參數和EL表達式,例如${fs:dirSize(myInputDir)};
(5):定時執行:
Azkaban的定時執行任務是基于時間的;
Oozie的定時執行任務基于時間和輸入數據;
(6):資源管理:
Azkaban有較嚴格的權限控制,如用戶對工作流進行讀/寫/執行等操作;
Oozie暫無嚴格的權限控制;
(7):工作流執行:
Azkaban有兩種運行模式,分別是solo server mode(executor server和web server部署在同一臺節點)和multi server mode(executor server和web server可以部署在不同節點);
Oozie作為工作流服務器運行,支持多用戶和多工作流;
(8):工作流管理:
Azkaban支持瀏覽器以及ajax方式操作工作流;
Oozie支持命令行、HTTP REST、Java API、瀏覽器操作工作流;
瀏覽器頁面訪問
http://node03:8081/
使用Oozie時通常整合hue,用戶數據倉庫調度
就是剛才選擇的腳本
腳本里需要的參數,盡量設置為動態自動獲取,如 ${date}
第一步的參數是所有文件和當天日期,后面的只需要日期,最后一步是導出所有結果,相應填入
添加文件和設置相應參數
運行后會有狀態提示頁面,可以看到任務進度
點擊調度任務的頁面情況
修改定時任務名和描述
添加需要定時調度的任務
sm-workflow的參數都是寫死的,沒有設置動態,這里的下拉列表就不會有可選項 。
設置參數
將sm-workflow的日期修改為 ${do_date},保存
進入定時計劃sm-dw中,會看到有參數 do_date
填入相應參數,前一天日期
Oozie常用系統常量
當然,也可以通過這樣將參數傳入workflow任務中,代碼或者shell中需要的參數 。
如,修改sm-workflow 中的 sqoop_import.sh,添加一個參數 ${num} 。
編輯文件(需要登陸Hue的用戶有對HDFS操作的權限),修改shell中的一個值為參數,保存 。
在workflow中,編輯添加參數 ${num} ,或者num=${num} 保存 。
進入schedule中,可以看到添加的參數,編輯輸入相應參數即可 。
Bundle統一管理所有定時調度,階段劃分:Bundle > Schedule > workflow

ETL工具之日志采集filebeat+logstash
原文地址: https://www.jianshu.com/p/7aa55172c3e2
web服務產生的日志文件,需要進行日志收集并進行可視化展示,一般使用filebeat和logstash組合 。
Logstash是具有實時收集日志功能,可以動態統一來自不同來源的數據,任何類型的事件都可以通過各種各樣的輸入,過濾功能和輸出插件來豐富和轉換 。是一個重量級的服務,很占用內存,會影響到部署到本機器上的服務 。
Filebeat是用于轉發和采集日志數據的輕量級服務 。能監視您指定的日志文件或位置,收集日志事件,并將它們轉發到Logstash或elasticsearch (在 5.x 版本中,它也具備過濾的能力,但是還不及Logstash豐富)
如果對于日志不需要進行過濾分析的,可以直接使用filebeat
如果需要對日志進行過濾分析,可以使用filebeat+Logstash最合適,如果單獨使用Logstash,多臺機都需部署Logstash,每臺機消耗資源大,filebeat+Logstash相結合,每臺機部署filebeat進行數據采集,一臺機部署Logstash作為中心進行接收數據處理以及存儲到不同的地方,
Logstash
Filebeat
filebeat文檔:https://www.elastic.co/guide/en/beats/filebeat/current/index.html
logstash文檔:https://www.elastic.co/guide/en/logstash/7.3

關于etl工具和etl工具informatica的內容就分享到這兒!更多實用知識經驗,盡在 m.apearl.cn