Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Scrivi su Amazon Kinesis Data Streams utilizzando Kinesis Agent
L'agente Kinesis è un'applicazione software Java autonoma che offre un modo semplice per raccogliere e inviare dati al flusso di dati Kinesis. L'agente controlla costantemente un set di file e invia i nuovi dati al flusso. L'agente gestisce la rotazione dei file, i checkpoint e i tentativi in caso di errori. Fornisce tutti i dati in un modo affidabile, tempestivo e semplice. Inoltre, emette i CloudWatch parametri di Amazon per aiutarti a monitorare e risolvere meglio il processo di streaming.
Come impostazione predefinita, i record vengono analizzati da ciascun file in base alla nuova riga di caratteri ('\n'
). Tuttavia, l'agente può anche essere configurato per analizzare record a più righe (consulta Specificare le impostazioni di configurazione dell'agente).
Puoi installare l'agente su ambienti server basati su Linux, come server Web, server di log e server di database. Dopo aver installato l'agente, configurarlo specificando i file da monitorare e il flusso per i dati. Dopo che l'agente è configurato, raccoglie i dati dal file in modo durevole e li invia al flusso in modo affidabile.
Argomenti
Completa i prerequisiti per Kinesis Agent
-
Il sistema operativo deve essere Amazon Linux AMI con versione 2015.09 o successiva o Red Hat Enterprise Linux versione 7 o successiva.
-
Se utilizzi Amazon EC2 per eseguire il tuo agente, avvia l'EC2istanza.
-
Gestisci AWS le tue credenziali utilizzando uno dei seguenti metodi:
-
Specificate un IAM ruolo all'avvio dell'EC2istanza.
-
Specificate AWS le credenziali quando configurate l'agente (vedi awsAccessKeyID e awsSecretAccesschiave).
-
Modifica
/etc/sysconfig/aws-kinesis-agent
per specificare la regione e le chiavi di AWS accesso. -
Se l'EC2istanza si trova in un AWS account diverso, crea un IAM ruolo per fornire l'accesso al servizio Kinesis Data Streams e specifica quel ruolo quando configuri l'agente assumeRoleARN(assumeRoleExternalvedi e Id). Utilizza uno dei metodi precedenti per specificare le AWS credenziali di un utente nell'altro account che ha il permesso di assumere questo ruolo.
-
-
Il IAM ruolo o AWS le credenziali specificati devono disporre dell'autorizzazione per eseguire l'operazione Kinesis Data PutRecordsStreams affinché l'agente possa inviare dati al tuo stream. Se abiliti il CloudWatch monitoraggio per l'agente, è necessaria anche l'autorizzazione a eseguire l' CloudWatch PutMetricDataoperazione. Per ulteriori informazioni, vedere Controllo dell'accesso alle risorse di Amazon Kinesis Data Streams tramite IAMMonitora lo stato di salute dell'agente Kinesis Data Streams con Amazon CloudWatch, e Controllo degli CloudWatch accessi.
Scarica e installa l'agente
Innanzitutto connettiti all'istanza, Per ulteriori informazioni, consulta Connect to Your Instance nella Amazon EC2 User Guide. In caso di problemi di connessione, consulta Risoluzione dei problemi di connessione alla tua istanza nella Amazon EC2 User Guide.
Per configurare l'agente utilizzando Amazon Linux AMI
Utilizzare il seguente comando per scaricare e installare l'agente:
sudo yum install –y aws-kinesis-agent
Per configurare l'agente utilizzando Red Hat Enterprise Linux
Utilizzare il seguente comando per scaricare e installare l'agente:
sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn2.noarch.rpm
Per configurare l'agente utilizzando GitHub
-
Scarica l'agente da awlabs/ amazon-kinesis-agent
. -
Installare l'agente spostandosi nella directory di download ed eseguendo il comando seguente:
sudo ./setup --install
Configurazione dell'agente in un container Docker
L'agente Kinesis può essere eseguito anche in un container tramite la base container amazonlinux. Utilizza il seguente Dockerfile e poi esegui docker build
.
FROM amazonlinux RUN yum install -y aws-kinesis-agent which findutils COPY agent.json /etc/aws-kinesis/agent.json CMD ["start-aws-kinesis-agent"]
Configura e avvia l'agente
Configurazione e avvio dell'agente
-
Aprire e modificare il file di configurazione (come superutente se vengono utilizzate le autorizzazioni predefinite di accesso al file):
/etc/aws-kinesis/agent.json
In questo file di configurazione, specificare i file (
"filePattern"
) dai quali l'agente raccoglie i dati e il nome del flusso ("kinesisStream"
) al quale l'agente invia i dati. Si noti che il nome del file è un modello e l'agente riconosce le rotazioni dei file. Puoi ruotare i file o creare nuovi file non più di una volta al secondo. L'agente usa il timestamp di creazione di file per stabilire quale file tracciare e inserire nel flusso; la creazione di nuovi file o la rotazione di file più frequentemente di una volta al secondo non consente all'agente di distinguerli in modo corretto.{ "flows": [ { "filePattern": "
/tmp/app.log*
", "kinesisStream": "yourkinesisstream
" } ] } -
Avvia l'agente manualmente:
sudo service aws-kinesis-agent start
-
(Facoltativo) Configurare l'agente per iniziare l'avvio del sistema:
sudo chkconfig aws-kinesis-agent on
L'agente è ora in esecuzione come servizio di sistema in background. Controlla costantemente i file specificati e invia i dati al flusso specificato. L'attività dell'agente viene registrata in /var/log/aws-kinesis-agent/aws-kinesis-agent.log
.
Specificare le impostazioni di configurazione dell'agente
L'agente supporta le due impostazioni di configurazione obbligatorie filePattern
e kinesisStream
, più impostazioni di configurazione opzionali per funzionalità aggiuntive. Puoi specificare la configurazione obbligatoria e opzionale in /etc/aws-kinesis/agent.json
.
Quando modifichi il file di configurazione, devi arrestare e avviare l'agente, utilizzando i comandi seguenti:
sudo service aws-kinesis-agent stop sudo service aws-kinesis-agent start
In alternativa, potresti utilizzare il comando seguente:
sudo service aws-kinesis-agent restart
Seguono le impostazioni di configurazione generali.
Impostazione di configurazione | Descrizione |
---|---|
assumeRoleARN |
Il ARN ruolo che deve essere assunto dall'utente. Per ulteriori informazioni, consulta Delegare l'accesso tra AWS account utilizzando i IAM ruoli nella Guida per l'IAMutente. |
assumeRoleExternalId |
Si è verificato un identificatore opzionale che determina chi può assumere il ruolo. Per ulteriori informazioni, consulta Come utilizzare un ID esterno nella Guida per l'IAMutente. |
awsAccessKeyId |
AWS ID della chiave di accesso che sostituisce le credenziali predefinite. Questa impostazione ha la precedenza su tutti gli altri provider di credenziali. |
awsSecretAccessKey |
AWS chiave segreta che sostituisce le credenziali predefinite. Questa impostazione ha la precedenza su tutti gli altri provider di credenziali. |
cloudwatch.emitMetrics |
Consente all'agente di emettere metriche su CloudWatch if set (true). Impostazione predefinita: true |
cloudwatch.endpoint |
L'endpoint regionale per. CloudWatch Impostazione predefinita: |
kinesis.endpoint |
L'endpoint regionale per il flusso di dati Kinesis. Impostazione predefinita: |
Seguono le impostazioni di configurazione del flusso.
Impostazione di configurazione | Descrizione |
---|---|
dataProcessingOptions |
L'elenco delle opzioni di elaborazione applicate a ciascun record analizzato prima dell'invio al flusso. Le opzioni di elaborazione vengono eseguite nell'ordine specificato. Per ulteriori informazioni, consulta Usa l'agente per pre-elaborare i dati. |
kinesisStream |
[Obbligatorio] Il nome del flusso. |
filePattern |
[Obbligatorio] La directory e il modello di file che devono corrispondere per essere rilevati dall'agente. Per tutti i file che corrispondono a questo modello, le autorizzazioni di lettura devono essere concesse a |
initialPosition |
La posizione iniziale dalla quale è iniziata l'analisi del file. I valori validi sono Impostazione predefinita: |
maxBufferAgeMillis |
Il tempo massimo, in millisecondi, in cui l'agente effettua il buffer dati prima di inviarli al flusso. Intervallo valore: da 1.000 a 900.000 (da 1 secondo a 15 minuti) Impostazione predefinita: 60.000 (1 minuto) |
maxBufferSizeBytes |
Le dimensioni massime, in byte, in cui l'agente effettua il buffer dati prima di inviarli al flusso. Intervallo valore: da 1 a 4.194.304 (4 MB) Impostazione predefinita: 4.194.304 (4 MB) |
maxBufferSizeRecords |
Il numero massimo di record in cui l'agente effettua il buffer dati prima di inviarli al flusso. Intervallo valore: da 1 a 500 Impostazione predefinita: 500 |
minTimeBetweenFilePollsMillis |
L'intervallo di tempo, in millisecondi, in cui l'agente esegue il polling e analizza i dati nuovi nei file monitorati. Intervallo valore: 1 o più Impostazione predefinita: 100 |
multiLineStartPattern |
Il modello per identificare l'inizio di un record. Un record è composto da una riga corrispondente al modello e da tutte le righe successive non corrispondenti al modello. I valori validi sono espressioni regolari. Come impostazione predefinita, ogni nuova riga nei file di log viene analizzata come un record. |
partitionKeyOption |
Il metodo per generare la chiave di partizione. I valori validi sono Impostazione predefinita: |
skipHeaderLines |
Il numero di righe necessarie perché l'agente salti l'analisi all'inizio dei file monitorati. Intervallo valore: 0 o più Impostazione predefinita: 0 (zero) |
truncatedRecordTerminator |
La stringa che l'agente utilizza per troncare un record analizzato quando le dimensioni del record eccedono il limite di dimensioni del record . (1.000 KB) Impostazione predefinita: |
Monitora più directory di file e scrivi su più flussi
Specificando più impostazioni di configurazione del flusso, puoi configurare l'agente in modo che monitori più directory di file e invii dati a più flussi. Nel seguente esempio di configurazione, l'agente monitora due directory di file e invia i dati rispettivamente a un flusso Kinesis e a un flusso di distribuzione Firehose. Tieni presente che puoi specificare endpoint diversi per Kinesis Data Streams e Firehose in modo che Kinesis stream e Firehose non debbano necessariamente trovarsi nella stessa regione.
{ "cloudwatch.emitMetrics":
true
, "kinesis.endpoint": "https://your/kinesis/endpoint
", "firehose.endpoint": "https://your/firehose/endpoint
", "flows": [ { "filePattern": "/tmp/app1.log*
", "kinesisStream": "yourkinesisstream
" }, { "filePattern": "/tmp/app2.log*
", "deliveryStream": "yourfirehosedeliverystream
" } ] }
Per informazioni più dettagliate sull'utilizzo dell'agente con Firehose, consulta Writing to Amazon Data Firehose with Kinesis Agent.
Usa l'agente per pre-elaborare i dati
L'agente può preelaborare i record analizzati dai file monitorati prima di inviarli al flusso. È possibile abilitare questa funzionalità aggiungendo le impostazioni di configurazione dataProcessingOptions
al flusso di file. Una o più opzioni di elaborazione possono essere aggiunte e saranno eseguite nell'ordine specificato.
L'agente supporta le seguenti opzioni di elaborazione elencate. Poiché l'agente è open source, è possibile sviluppare ulteriormente e ampliare le opzioni di elaborazione. Puoi scaricare l'agente dall’agente Kinesis
Opzioni di elaborazione
SINGLELINE
-
Converte un record a più righe in un record a riga singola rimuovendo i caratteri di nuova riga, gli spazi iniziali e finali.
{ "optionName": "SINGLELINE" }
CSVTOJSON
-
Converte un record dal formato separato da delimitatori al formato. JSON
{ "optionName": "CSVTOJSON", "customFieldNames": [ "
field1
", "field2
",...
], "delimiter": "yourdelimiter
" }customFieldNames
-
[Obbligatorio] I nomi dei campi usati come chiavi in ogni coppia JSON chiave-valore. Ad esempio, se specifichi
["f1", "f2"]
, il record "v1, v2" sarà convertito in{"f1":"v1","f2":"v2"}
. delimiter
-
La stringa utilizzata come delimitatore nel record. L'impostazione predefinita è una virgola (,).
LOGTOJSON
-
Converte un record da un formato di registro a un JSON altro. I formati di log supportati sono Apache Common Log, Apache Combined Log, Apache Error Log e RFC3164 Syslog.
{ "optionName": "LOGTOJSON", "logFormat": "
logformat
", "matchPattern": "yourregexpattern
", "customFieldNames": [ "field1
", "field2
",…
] }logFormat
-
[Obbligatorio] Il formato di inserimento dei log. I seguenti sono i valori possibili:
-
COMMONAPACHELOG
- Il formato Apache Common Log. Ogni voce di log ha il seguente modello come impostazione predefinita: "%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes}
". -
COMBINEDAPACHELOG
: il formato Apache Combined Log. Ogni voce di log ha il seguente modello come impostazione predefinita: "%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes} %{referrer} %{agent}
". -
APACHEERRORLOG
: il formato Apache Error Log. Ogni voce di log ha il seguente modello come impostazione predefinita: "[%{timestamp}] [%{module}:%{severity}] [pid %{processid}:tid %{threadid}] [client: %{client}] %{message}
". -
SYSLOG
— Il formato RFC3164 Syslog. Ogni voce di log ha il seguente modello come impostazione predefinita: "%{timestamp} %{hostname} %{program}[%{processid}]: %{message}
".
-
matchPattern
-
Il modello di espressione regolare utilizzato per estrarre valori dalle voci di log. Questa impostazione viene utilizzata se la tua voce di log non è in uno dei formati di log predefiniti. Se questa impostazione viene utilizzata, devi specificare anche
customFieldNames
. customFieldNames
-
I nomi dei campi personalizzati utilizzati come chiavi in ogni coppia JSON chiave-valore. Puoi utilizzare questa impostazione per definire i nomi dei campi per i valori estratti da
matchPattern
oppure sovrascrivere i nomi dei campi predefiniti dei formati di log predefiniti.
Esempio : LOGTOJSON Configurazione
Ecco un esempio di LOGTOJSON
configurazione per una voce di Apache Common Log convertita in JSON formato:
{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" }
Prima della conversione:
64.242.88.10 - - [07/Mar/2004:16:10:02 -0800] "GET /mailman/listinfo/hsdivision HTTP/1.1" 200 6291
Dopo la conversione:
{"host":"64.242.88.10","ident":null,"authuser":null,"datetime":"07/Mar/2004:16:10:02 -0800","request":"GET /mailman/listinfo/hsdivision HTTP/1.1","response":"200","bytes":"6291"}
Esempio : LOGTOJSON Configurazione con campi personalizzati
Ecco un altro esempio di configurazione LOGTOJSON
:
{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "customFieldNames": ["f1", "f2", "f3", "f4", "f5", "f6", "f7"] }
Con questa impostazione di configurazione, la stessa voce di Apache Common Log dell'esempio precedente viene convertita nel JSON formato seguente:
{"f1":"64.242.88.10","f2":null,"f3":null,"f4":"07/Mar/2004:16:10:02 -0800","f5":"GET /mailman/listinfo/hsdivision HTTP/1.1","f6":"200","f7":"6291"}
Esempio : convertire la voce Apache Common Log
La seguente configurazione di flusso converte una voce di Apache Common Log in un record a riga singola nel formato: JSON
{ "flows": [ { "filePattern": "
/tmp/app.log*
", "kinesisStream": "my-stream
", "dataProcessingOptions": [ { "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" } ] } ] }
Esempio : convertire record a più righe
La seguente configurazione del flusso analizza i record a più righe la cui prima riga inizia con "[SEQUENCE=
". Ogni record viene convertito in un record a riga singola. Quindi, i valori vengono estratti dal record in base a un delimitatore di schede. I valori estratti vengono mappati su customFieldNames
valori specificati per formare un record a riga singola in formato. JSON
{ "flows": [ { "filePattern": "
/tmp/app.log*
", "kinesisStream": "my-stream
", "multiLineStartPattern": "\\[SEQUENCE=
", "dataProcessingOptions": [ { "optionName": "SINGLELINE" }, { "optionName": "CSVTOJSON", "customFieldNames": [ "field1
", "field2
", "field3
" ], "delimiter": "\\t
" } ] } ] }
Esempio : LOGTOJSON Configurazione con Match Pattern
Ecco un esempio di LOGTOJSON
configurazione per una voce di Apache Common Log convertita in JSON formato, con l'ultimo campo (byte) omesso:
{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "matchPattern": "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3})", "customFieldNames": ["host", "ident", "authuser", "datetime", "request", "response"] }
Prima della conversione:
123.45.67.89 - - [27/Oct/2000:09:27:09 -0400] "GET /java/javaResources.html HTTP/1.0" 200
Dopo la conversione:
{"host":"123.45.67.89","ident":null,"authuser":null,"datetime":"27/Oct/2000:09:27:09 -0400","request":"GET /java/javaResources.html HTTP/1.0","response":"200"}
Usa i comandi dell'agente CLI
Avviare automaticamente l'agente all'avvio del sistema:
sudo chkconfig aws-kinesis-agent on
Controlla lo stato dell'agente:
sudo service aws-kinesis-agent status
Interrompi l'agente:
sudo service aws-kinesis-agent stop
Leggi il file di log dell'agente da questa posizione:
/var/log/aws-kinesis-agent/aws-kinesis-agent.log
Disinstalla l'agente:
sudo yum remove aws-kinesis-agent
FAQ
Esiste un agente Kinesis per Windows?
L'agente Kinesis per Windows è un software diverso dall'agente Kinesis per piattaforme Linux.
Perché l'agente Kinesis rallenta e/o RecordSendErrors
aumenta?
Di solito ciò è dovuto alla limitazione di Kinesis. Controlla la WriteProvisionedThroughputExceeded
metrica per Kinesis Data Streams o la ThrottledRecords
metrica per Firehose Delivery Streams. Qualsiasi aumento rispetto a 0 di questi parametri indica che è necessario aumentare i limiti dei flussi. Per ulteriori informazioni, consulta Limiti del flusso di dati Kinesis e Flussi di consegna di Amazon Firehose.
Una volta esclusa la limitazione, verifica se Kinesis Agent è configurato in modo da monitorare grandi quantità di file di piccole dimensioni. Si verifica un ritardo nel momento in cui Kinesis Agent esegue il tail di un nuovo file, quindi Kinesis Agent dovrebbe eseguire la coda su una piccola quantità di file più grandi. Prova a consolidare i tuoi file di log in file più grandi.
Perché ricevo delle java.lang.OutOfMemoryError
eccezioni?
Kinesis Agent non dispone di memoria sufficiente per gestire il carico di lavoro corrente. Prova ad aumentare, JAVA_START_HEAP
inserire /usr/bin/start-aws-kinesis-agent
e JAVA_MAX_HEAP
riavviare l'agente.
Perché IllegalStateException : connection pool shut down
ricevo delle eccezioni?
Kinesis Agent non dispone di connessioni sufficienti per gestire il carico di lavoro corrente. Prova ad aumentare maxConnections
e maxSendingThreads
a inserire le impostazioni generali della configurazione dell'agente su. /etc/aws-kinesis/agent.json
Il valore predefinito per questi campi è 12 volte superiore ai processori di runtime disponibili. Consulta AgentConfiguration.java
Come posso eseguire il debug di un altro problema con Kinesis Agent?
DEBUG
i log di livello possono essere abilitati in. /etc/aws-kinesis/log4j.xml
Come devo configurare Kinesis Agent?
Più piccolo èmaxBufferSizeBytes
, più frequentemente Kinesis Agent invierà i dati. Ciò può essere utile in quanto riduce i tempi di consegna dei record, ma aumenta anche le richieste al secondo a Kinesis.
Perché Kinesis Agent invia record duplicati?
Ciò si verifica a causa di un'errata configurazione nella coda dei file. Assicurati che ognuno corrisponda fileFlow’s filePattern
a un solo file. Ciò può verificarsi anche se la logrotate
modalità utilizzata è copytruncate
attiva. Prova a passare alla modalità predefinita o crea per evitare duplicazioni. Per ulteriori informazioni sulla gestione dei record duplicati, vedere Gestione dei record duplicati.