/usr/share/grafana/public/app/plugins/datasource/loki
import { noop } from 'lodash'; import { Observable, Subject, of, throwError, concat } from 'rxjs'; import { mergeMap } from 'rxjs/operators'; import * as rxJsWebSocket from 'rxjs/webSocket'; import { DataFrame, DataFrameView, formatLabels, Labels } from '@grafana/data'; import { LiveStreams } from './LiveStreams'; import { LokiTailResponse } from './types'; interface ErrorException extends Error { code?: number; } let fakeSocket: Subject<LokiTailResponse>; jest.mock('rxjs/webSocket', () => { return { __esModule: true, webSocket: () => fakeSocket, }; }); describe('Live Stream Tests', () => { afterAll(() => { jest.restoreAllMocks(); }); const msg0: LokiTailResponse = { streams: [ { stream: { filename: '/var/log/sntpc.log', job: 'varlogs' }, values: [['1567025440118944705', 'Kittens']], }, ], dropped_entries: null, }; it('reads the values into the buffer', (done) => { fakeSocket = new Subject<LokiTailResponse>(); const labels: Labels = { job: 'varlogs' }; const target = makeTarget('fake', labels); const stream = new LiveStreams().getStream(target); expect.assertions(3); const tests = [ (val: DataFrame[]) => { expect(val[0].length).toEqual(7); }, (val: DataFrame[]) => { expect(val[0].length).toEqual(8); const view = new DataFrameView(val[0]); const last = { ...view.get(view.length - 1) }; expect(last).toEqual({ Time: '2019-08-28T20:50:40.118Z', id: 'A_25d81461-a66f-53ff-98d5-e39515af4735', Line: 'Kittens', }); }, ]; stream.subscribe({ next: (val) => { const test = tests.shift(); test!(val); }, complete: () => done(), }); // Send it the initial list of things fakeSocket.next(initialRawResponse); // Send it a single update fakeSocket.next(msg0); fakeSocket.complete(); }); it('returns the same subscription if the url matches existing one', () => { fakeSocket = new Subject<LokiTailResponse>(); const liveStreams = new LiveStreams(); const stream1 = liveStreams.getStream(makeTarget('url_to_match')); const stream2 = liveStreams.getStream(makeTarget('url_to_match')); expect(stream1).toBe(stream2); }); it('returns new subscription when the previous unsubscribed', () => { fakeSocket = new Subject<LokiTailResponse>(); const liveStreams = new LiveStreams(); const stream1 = liveStreams.getStream(makeTarget('url_to_match')); const subscription = stream1.subscribe({ next: noop, }); subscription.unsubscribe(); const stream2 = liveStreams.getStream(makeTarget('url_to_match')); expect(stream1).not.toBe(stream2); }); it('returns new subscription when the previous is unsubscribed and correctly unsubscribes from source', () => { let unsubscribed = false; const fakeSocket = new Observable(() => { return () => (unsubscribed = true); }); jest.spyOn(rxJsWebSocket, 'webSocket').mockReturnValue(fakeSocket as rxJsWebSocket.WebSocketSubject<unknown>); const liveStreams = new LiveStreams(); const stream1 = liveStreams.getStream(makeTarget('url_to_match')); const subscription = stream1.subscribe({ next: noop, }); subscription.unsubscribe(); expect(unsubscribed).toBe(true); }); it('should reconnect when abnormal error', async () => { const abnormalError = new Error('weird error') as ErrorException; abnormalError.code = 1006; const logStreamBeforeError = of({ streams: [ { stream: { filename: '/var/log/sntpc.log', job: 'varlogs' }, values: [['1567025440118944705', 'Kittens']], }, ], dropped_entries: null, }); const logStreamAfterError = of({ streams: [ { stream: { filename: '/var/log/sntpc.log', job: 'varlogs' }, values: [['1567025440118944705', 'Doggos']], }, ], dropped_entries: null, }); const errorStream = throwError(abnormalError); let retries = 0; const fakeSocket = of({}).pipe( mergeMap(() => { // When subscribed first time, return logStream and errorStream if (retries++ === 0) { return concat(logStreamBeforeError, errorStream); } // When re-subsribed after abnormal error, return just logStream return logStreamAfterError; }) ); jest.spyOn(rxJsWebSocket, 'webSocket').mockReturnValue(fakeSocket as rxJsWebSocket.WebSocketSubject<unknown>); const liveStreams = new LiveStreams(); await expect(liveStreams.getStream(makeTarget('url_to_match'), 100)).toEmitValuesWith((received) => { const data = received[0]; const view = new DataFrameView(data[0]); const firstLog = { ...view.get(0) }; const secondLog = { ...view.get(1) }; expect(firstLog.Line).toBe('Kittens'); expect(secondLog.Line).toBe('Doggos'); expect(retries).toBe(2); }); }); }); /** * Create target (query to run). Url is what is used as cache key. */ function makeTarget(url: string, labels?: Labels) { labels = labels || { job: 'varlogs' }; return { url, size: 10, query: formatLabels(labels), refId: 'A', regexp: '', }; } //---------------------------------------------------------------- // Added this at the end so the top is more readable //---------------------------------------------------------------- const initialRawResponse: LokiTailResponse = { streams: [ { stream: { filename: '/var/log/docker.log', job: 'varlogs', }, values: [ [ '1567025018215000000', 'level=debug msg="[resolver] received AAAA record \\"::1\\" for \\"localhost.\\" from udp:192.168.65.1"', ], [ '1567025018215000000', '2019-08-28T20:43:38Z docker time="2019-08-28T20:43:38.147224630Z" ' + 'level=debug msg="[resolver] received AAAA record \\"fe80::1\\" for \\"localhost.\\" from udp:192.168.65.1"', ], ['1567025020452000000', '2019-08-28T20:43:40Z sntpc sntpc[1]: offset=-0.022171, delay=0.000463'], ['1567025050297000000', '2019-08-28T20:44:10Z sntpc sntpc[1]: offset=-0.022327, delay=0.000527'], [ '1567025078152000000', '2019-08-28T20:44:38Z lifecycle-server time="2019-08-28T20:44:38.095444834Z" ' + 'level=debug msg="Name To resolve: localhost."', ], [ '1567025078152000000', '2019-08-28T20:44:38Z lifecycle-server time="2019-08-28T20:44:38.095896074Z" ' + 'level=debug msg="[resolver] query localhost. (A) from 172.22.0.4:53748, forwarding to udp:192.168.65.1"', ], [ '1567025078152000000', '2019-08-28T20:44:38Z docker time="2019-08-28T20:44:38.095444834Z" level=debug msg="Name To resolve: localhost."', ], ], }, ], dropped_entries: null, };
.
Edit
..
Edit
CHANGELOG.md
Edit
LanguageProvider.test.ts
Edit
LanguageProvider.ts
Edit
LiveStreams.test.ts
Edit
LiveStreams.ts
Edit
LogContextProvider.test.ts
Edit
LogContextProvider.ts
Edit
LokiVariableSupport.test.ts
Edit
LokiVariableSupport.ts
Edit
README.md
Edit
backendResultTransformer.test.ts
Edit
backendResultTransformer.ts
Edit
components
Edit
configuration
Edit
dataquery.cue
Edit
dataquery.gen.ts
Edit
datasource.test.ts
Edit
datasource.ts
Edit
dist
Edit
docs
Edit
getDerivedFields.test.ts
Edit
getDerivedFields.ts
Edit
img
Edit
jest-setup.js
Edit
jest.config.js
Edit
languageUtils.test.ts
Edit
languageUtils.ts
Edit
language_utils.test.ts
Edit
lineParser.test.ts
Edit
lineParser.ts
Edit
liveStreamsResultTransformer.test.ts
Edit
liveStreamsResultTransformer.ts
Edit
logsTimeSplitting.test.ts
Edit
logsTimeSplitting.ts
Edit
makeTableFrames.test.ts
Edit
makeTableFrames.ts
Edit
mergeResponses.test.ts
Edit
mergeResponses.ts
Edit
metricTimeSplitting.test.ts
Edit
metricTimeSplitting.ts
Edit
migrations
Edit
mocks
Edit
modifyQuery.test.ts
Edit
modifyQuery.ts
Edit
module.test.ts
Edit
module.ts
Edit
package.json
Edit
plugin.json
Edit
project.json
Edit
queryHints.test.ts
Edit
queryHints.ts
Edit
querySplitting.test.ts
Edit
querySplitting.ts
Edit
queryUtils.test.ts
Edit
queryUtils.ts
Edit
querybuilder
Edit
responseUtils.test.ts
Edit
responseUtils.ts
Edit
shardQuerySplitting.test.ts
Edit
shardQuerySplitting.ts
Edit
sortDataFrame.test.ts
Edit
sortDataFrame.ts
Edit
streaming.test.ts
Edit
streaming.ts
Edit
syntax.test.ts
Edit
syntax.ts
Edit
tracking.test.ts
Edit
tracking.ts
Edit
tsconfig.json
Edit
types.ts
Edit
webpack.config.ts
Edit